异常检测
您可以使用 OpenSearch Data Prepper 对时间序列聚合事件进行模型训练并近乎实时地生成异常。您可以在管道内生成的事件上或直接进入管道的事件(如 OpenTelemetry 指标)上生成异常。您可以将这些翻滚窗口聚合的时间序列事件馈送给 anomaly_detector
处理器,该处理器会训练模型并生成带有等级分数的异常。然后,您可以配置您的管道将异常写入单独的索引,以创建文档监视器并触发快速警报。
日志指标
以下管道从 FluentBit 等 HTTP 源接收日志,通过将 log
键中的值与 Grok Apache 通用日志格式进行匹配来从日志中提取重要值,然后将解析后的日志转发到 log-to-metrics-pipeline
管道以及名为 logs
的 OpenSearch 索引。
log-to-metrics-pipeline
管道从 apache-log-pipeline-with-metrics
管道接收解析后的日志,对其进行聚合,并根据 clientip
和 request
键中的值导出直方图指标。然后,它将直方图指标发送到名为 histogram_metrics
的 OpenSearch 索引以及 log-to-metrics-anomaly-detector-pipeline
管道。
log-to-metrics-anomaly-detector-pipeline
管道从 log-to-metrics-pipeline
管道接收聚合的直方图指标,并将其发送到 anomaly_detector
处理器,使用 Random Cut Forest 算法检测异常。如果算法检测到异常,则将其发送到名为 log-metric-anomalies
的 OpenSearch 索引。
apache-log-pipeline-with-metrics:
source:
http:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/apache-log-pipeline-with-metrics/logs". This will be the FluentBit output URI value.
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
sink:
- opensearch:
...
index: "logs"
- pipeline:
name: "log-to-metrics-pipeline"
log-to-metrics-pipeline:
source:
pipeline:
name: "apache-log-pipeline-with-metrics"
processor:
- aggregate:
# Specify the required identification keys
identification_keys: ["clientip", "request"]
action:
histogram:
# Specify the appropriate values for each the following fields
key: "bytes"
record_minmax: true
units: "bytes"
buckets: [0, 25000000, 50000000, 75000000, 100000000]
# Pick the required aggregation period
group_duration: "30s"
sink:
- opensearch:
...
index: "histogram_metrics"
- pipeline:
name: "log-to-metrics-anomaly-detector-pipeline"
log-to-metrics-anomaly-detector-pipeline:
source:
pipeline:
name: "log-to-metrics-pipeline"
processor:
- anomaly_detector:
# Specify the key on which to run anomaly detection
keys: [ "bytes" ]
mode:
random_cut_forest:
sink:
- opensearch:
...
index: "log-metric-anomalies"
追踪指标
您可以从追踪中导出指标,并发现这些指标中的异常。在此示例中,entry-pipeline
管道从 OpenTelemetry Collector 接收追踪数据,并将其转发到以下管道:
-
span-pipeline
–- 从追踪中提取原始 Span。该管道将原始 Span 发送到任何以otel-v1-apm-span
为前缀的 OpenSearch 索引。 -
service-map-pipeline
–- 聚合并分析追踪,以创建表示服务之间连接的文档。该管道将这些文档发送到名为otel-v1-apm-service-map
的 OpenSearch 索引。然后,您可以通过 OpenSearch Dashboards 的 追踪分析插件查看服务映射的可视化。 -
trace-to-metrics-pipeline
– 聚合并根据serviceName
的值从追踪中导出直方图指标。然后,该管道将导出的指标发送到名为metrics_for_traces
的 OpenSearch 索引以及trace-to-metrics-anomaly-detector-pipeline
管道。
HThe trace-to-metrics-anomaly-detector-pipeline
管道从 trace-to-metrics-pipeline
接收聚合的直方图指标,并将其发送到 anomaly_detector
处理器,使用 Random Cut Forest 算法检测异常。如果算法检测到任何异常,则将其发送到名为 trace-metric-anomalies
的 OpenSearch 索引。
entry-pipeline:
source:
otel_trace_source:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/entry-pipeline/v1/traces". This will be endpoint URI path in OpenTelemetry Exporter
# configuration.
# path: "/${pipelineName}/v1/traces"
processor:
- trace_peer_forwarder:
sink:
- pipeline:
name: "span-pipeline"
- pipeline:
name: "service-map-pipeline"
- pipeline:
name: "trace-to-metrics-pipeline"
span-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- otel_trace_raw:
sink:
- opensearch:
...
index_type: "trace-analytics-raw"
service-map-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- service_map:
sink:
- opensearch:
...
index_type: "trace-analytics-service-map"
trace-to-metrics-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- aggregate:
# Pick the required identification keys
identification_keys: ["serviceName"]
action:
histogram:
# Pick the appropriate values for each the following fields
key: "durationInNanos"
record_minmax: true
units: "seconds"
buckets: [0, 10000000, 50000000, 100000000]
# Pick the required aggregation period
group_duration: "30s"
sink:
- opensearch:
...
index: "metrics_for_traces"
- pipeline:
name: "trace-to-metrics-anomaly-detector-pipeline"
trace-to-metrics-anomaly-detector-pipeline:
source:
pipeline:
name: "trace-to-metrics-pipeline"
processor:
- anomaly_detector:
# Below Key will find anomalies in the max value of histogram generated for durationInNanos.
keys: [ "max" ]
mode:
random_cut_forest:
sink:
- opensearch:
...
index: "trace-metric-anomalies"
OpenTelemetry 指标
您可以创建接收 OpenTelemetry 指标并检测其中异常的管道。在此示例中,entry-pipeline
从 OpenTelemetry Collector 接收指标。如果指标类型为 GAUGE
且指标名称为 totalApiBytesSent
,则处理器会将其发送到 ad-pipeline
管道。
ad-pipeline
管道从入口管道接收指标,并使用 anomaly_detector
处理器对指标值执行异常检测。
entry-pipeline:
source:
otel_metrics_source:
processor:
- otel_metrics:
route:
- gauge_route: '/kind = "GAUGE" and /name = "totalApiBytesSent"'
sink:
- pipeline:
name: "ad-pipeline"
routes:
- gauge_route
- opensearch:
...
index: "otel-metrics"
ad-pipeline:
source:
pipeline:
name: "entry-pipeline"
processor:
- anomaly_detector:
# Use "value" as the key on which anomaly detector needs to be run
keys: [ "value" ]
mode:
random_cut_forest:
sink:
- opensearch:
...
index: otel-metrics-anomalies