Link Search Menu Expand Document Documentation Menu

异常检测

您可以使用 OpenSearch Data Prepper 对时间序列聚合事件进行模型训练并近乎实时地生成异常。您可以在管道内生成的事件上或直接进入管道的事件(如 OpenTelemetry 指标)上生成异常。您可以将这些翻滚窗口聚合的时间序列事件馈送给 anomaly_detector 处理器,该处理器会训练模型并生成带有等级分数的异常。然后,您可以配置您的管道将异常写入单独的索引,以创建文档监视器并触发快速警报。

日志指标

以下管道从 FluentBit 等 HTTP 源接收日志,通过将 log 键中的值与 Grok Apache 通用日志格式进行匹配来从日志中提取重要值,然后将解析后的日志转发到 log-to-metrics-pipeline 管道以及名为 logs 的 OpenSearch 索引。

log-to-metrics-pipeline 管道从 apache-log-pipeline-with-metrics 管道接收解析后的日志,对其进行聚合,并根据 clientiprequest 键中的值导出直方图指标。然后,它将直方图指标发送到名为 histogram_metrics 的 OpenSearch 索引以及 log-to-metrics-anomaly-detector-pipeline 管道。

log-to-metrics-anomaly-detector-pipeline 管道从 log-to-metrics-pipeline 管道接收聚合的直方图指标,并将其发送到 anomaly_detector 处理器,使用 Random Cut Forest 算法检测异常。如果算法检测到异常,则将其发送到名为 log-metric-anomalies 的 OpenSearch 索引。

apache-log-pipeline-with-metrics:
  source:
    http:
      # Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
      # In this case it would be "/apache-log-pipeline-with-metrics/logs". This will be the FluentBit output URI value.
      path: "/${pipelineName}/logs"
  processor:
    - grok:
        match:
          log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
  sink:
    - opensearch:
        ...
        index: "logs"
    - pipeline:
        name: "log-to-metrics-pipeline"

log-to-metrics-pipeline:
  source:
    pipeline:
      name: "apache-log-pipeline-with-metrics"
  processor:
    - aggregate:
        # Specify the required identification keys
        identification_keys: ["clientip", "request"]
        action:
          histogram:
            # Specify the appropriate values for each the following fields
            key: "bytes"
            record_minmax: true
            units: "bytes"
            buckets: [0, 25000000, 50000000, 75000000, 100000000]
        # Pick the required aggregation period
        group_duration: "30s"
  sink:
    - opensearch:
        ...
        index: "histogram_metrics"
    - pipeline:
        name: "log-to-metrics-anomaly-detector-pipeline"

log-to-metrics-anomaly-detector-pipeline:
  source:
    pipeline:
      name: "log-to-metrics-pipeline"
  processor:
    - anomaly_detector:
        # Specify the key on which to run anomaly detection
        keys: [ "bytes" ]
        mode:
          random_cut_forest:
  sink:
    - opensearch:
        ...
        index: "log-metric-anomalies"

追踪指标

您可以从追踪中导出指标,并发现这些指标中的异常。在此示例中,entry-pipeline 管道从 OpenTelemetry Collector 接收追踪数据,并将其转发到以下管道:

  • span-pipeline –- 从追踪中提取原始 Span。该管道将原始 Span 发送到任何以 otel-v1-apm-span 为前缀的 OpenSearch 索引。

  • service-map-pipeline –- 聚合并分析追踪,以创建表示服务之间连接的文档。该管道将这些文档发送到名为 otel-v1-apm-service-map 的 OpenSearch 索引。然后,您可以通过 OpenSearch Dashboards 的 追踪分析插件查看服务映射的可视化。

  • trace-to-metrics-pipeline – 聚合并根据 serviceName 的值从追踪中导出直方图指标。然后,该管道将导出的指标发送到名为 metrics_for_traces 的 OpenSearch 索引以及 trace-to-metrics-anomaly-detector-pipeline 管道。

HThe trace-to-metrics-anomaly-detector-pipeline 管道从 trace-to-metrics-pipeline 接收聚合的直方图指标,并将其发送到 anomaly_detector 处理器,使用 Random Cut Forest 算法检测异常。如果算法检测到任何异常,则将其发送到名为 trace-metric-anomalies 的 OpenSearch 索引。

entry-pipeline:
  source:
    otel_trace_source:
      # Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
      # In this case it would be "/entry-pipeline/v1/traces". This will be endpoint URI path in OpenTelemetry Exporter 
      # configuration.
      # path: "/${pipelineName}/v1/traces"
  processor:
    - trace_peer_forwarder:
  sink:
    - pipeline:
        name: "span-pipeline"
    - pipeline:
        name: "service-map-pipeline"
    - pipeline:
        name: "trace-to-metrics-pipeline"

span-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
  processor:
    - otel_trace_raw:
  sink:
    - opensearch:
        ...
        index_type: "trace-analytics-raw"

service-map-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
  processor:
    - service_map:
  sink:
    - opensearch:
        ...
        index_type: "trace-analytics-service-map"

trace-to-metrics-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
  processor:
    - aggregate:
        # Pick the required identification keys
        identification_keys: ["serviceName"]
        action:
          histogram:
            # Pick the appropriate values for each the following fields
            key: "durationInNanos"
            record_minmax: true
            units: "seconds"
            buckets: [0, 10000000, 50000000, 100000000]
        # Pick the required aggregation period
        group_duration: "30s"
  sink:
    - opensearch:
        ...
        index: "metrics_for_traces"
    - pipeline:
        name: "trace-to-metrics-anomaly-detector-pipeline"

trace-to-metrics-anomaly-detector-pipeline:
  source:
    pipeline:
      name: "trace-to-metrics-pipeline"
  processor:
    - anomaly_detector:
        # Below Key will find anomalies in the max value of histogram generated for durationInNanos.
        keys: [ "max" ]
        mode:
          random_cut_forest:
  sink:
    - opensearch:
        ...
        index: "trace-metric-anomalies"

OpenTelemetry 指标

您可以创建接收 OpenTelemetry 指标并检测其中异常的管道。在此示例中,entry-pipeline 从 OpenTelemetry Collector 接收指标。如果指标类型为 GAUGE 且指标名称为 totalApiBytesSent,则处理器会将其发送到 ad-pipeline 管道。

ad-pipeline 管道从入口管道接收指标,并使用 anomaly_detector 处理器对指标值执行异常检测。

entry-pipeline:
  source:
    otel_metrics_source:
  processor:
    - otel_metrics:
  route:
    - gauge_route: '/kind = "GAUGE" and /name = "totalApiBytesSent"'
  sink:
    - pipeline:
        name: "ad-pipeline"
        routes:
          - gauge_route
    - opensearch:
        ...
        index: "otel-metrics"

ad-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
    processor:
      - anomaly_detector:
        # Use "value" as the key on which anomaly detector needs to be run
        keys: [ "value" ]
        mode:
          random_cut_forest:
    sink:
      - opensearch:
        ...
        index: otel-metrics-anomalies                     

剩余 350 字符

有问题?

想要贡献?