Link Search Menu Expand Document Documentation Menu

事件聚合

您可以使用 OpenSearch Data Prepper 在一段时间内聚合来自不同事件的数据。事件聚合有助于减少不必要的日志量,并管理诸如作为单独事件接收的多行日志之类的用例。aggregate 处理器是一个有状态处理器,它根据一组指定的识别键的值对事件进行分组,并对每个组执行可配置的操作。

aggregate 处理器状态存储在内存中。例如,为了将四个事件组合成一个,处理器需要保留前三个事件的片段。一个聚合事件组的状态会保留可配置的时间量。根据您的日志、使用的聚合操作以及处理器配置中的内存选项数量,聚合可能会在很长一段时间内发生。

基本用法

以下示例管道使用 grok 处理器提取字段 sourceIpdestinationIpport,然后使用 aggregate 处理器put_all 操作对这些字段进行 30 秒的聚合。在 30 秒周期结束时,聚合日志被发送到 OpenSearch 接收器。

aggregate_pipeline:  
   source:
     http:
      path: "/${pipelineName}/logs"
   processor:
     - grok:
         match: 
           log: ["%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port:int}"]
     - aggregate:
         group_duration: "30s"
         identification_keys: ["sourceIp", "destinationIp", "port"]
         action:
           put_all:
   sink:
     - opensearch:
         ...
         index: aggregated_logs

例如,考虑以下批日志

{ "log": "127.0.0.1 192.168.0.1 80", "status": 200 }
{ "log": "127.0.0.1 192.168.0.1 80", "bytes": 1000 }
{ "log": "127.0.0.1 192.168.0.1 80" "http_verb": "GET" }

grok 处理器将提取键,使日志事件看起来像以下示例。这些事件现在包含了 aggregate 处理器为 identification_keys 所需的数据。

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "http_verb": "GET" }

30 秒后,aggregate 处理器将以下聚合日志写入接收器

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200, "bytes": 1000, "http_verb": "GET" }

删除重复项

您可以通过从传入事件中派生键并为 aggregate 处理器指定 remove_duplicates 选项来删除重复条目。此操作会立即处理组中的第一个事件,并丢弃该组中所有后续事件。

在以下示例中,第一个事件使用识别键 sourceIpdestinationIp 进行处理

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }

然后管道将丢弃以下事件,因为它具有相同的键

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }

管道处理此事件并创建一个新组,因为 sourceIp 不同

{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }

日志聚合和条件路由

您可以使用多个插件将日志聚合与条件路由结合起来。在此示例中,管道 log-aggregate-pipeline 使用 HTTP 客户端(如 FluentBit)接收日志,并通过将 log 键中的值与 Apache 通用日志格式匹配来从日志中提取重要值。

管道使用 Grok 模式从日志中提取的两个值包括 responseclientip。然后,aggregate 处理器使用 clientip 值以及 remove_duplicates 选项,丢弃任何在给定 group_duration 内已处理过的包含 clientip 的日志。

管道中存在三个路由或条件语句。这些路由将响应的值分为 2xx3xx4xx5xx 响应。状态为 2xx3xx 的日志被发送到 aggregated_2xx_3xx 索引,状态为 4xx 的日志被发送到 aggregated_4xx index,状态为 5xx 的日志被发送到 aggregated_5xx 索引。

log-aggregate-pipeline:
  source:
    http:
      # Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
      # In this case it would be "/log-aggregate-pipeline/logs". This will be the FluentBit output URI value.
      path: "/${pipelineName}/logs"
  processor:
    - grok:
        match:
          log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
    - aggregate:
        identification_keys: ["clientip"]
        action:
          remove_duplicates:
        group_duration: "180s"
  route:
    - 2xx_status: "/response >= 200 and /response < 300"
    - 3xx_status: "/response >= 300 and /response < 400"
    - 4xx_status: "/response >= 400 and /response < 500"
    - 5xx_status: "/response >= 500 and /response < 600"
  sink:
    - opensearch:
        ...
        index: "aggregated_2xx_3xx"
        routes:
          - 2xx_status
          - 3xx_status
    - opensearch:
        ...
        index: "aggregated_4xx"
        routes:
          - 4xx_status
    - opensearch:
        ...
        index: "aggregated_5xx"
        routes:
          - 5xx_status
剩余 350 字符

有问题?

想贡献?