事件聚合
您可以使用 OpenSearch Data Prepper 在一段时间内聚合来自不同事件的数据。事件聚合有助于减少不必要的日志量,并管理诸如作为单独事件接收的多行日志之类的用例。aggregate
处理器是一个有状态处理器,它根据一组指定的识别键的值对事件进行分组,并对每个组执行可配置的操作。
aggregate
处理器状态存储在内存中。例如,为了将四个事件组合成一个,处理器需要保留前三个事件的片段。一个聚合事件组的状态会保留可配置的时间量。根据您的日志、使用的聚合操作以及处理器配置中的内存选项数量,聚合可能会在很长一段时间内发生。
基本用法
以下示例管道使用 grok
处理器提取字段 sourceIp
、destinationIp
和 port
,然后使用 aggregate
处理器和 put_all
操作对这些字段进行 30 秒的聚合。在 30 秒周期结束时,聚合日志被发送到 OpenSearch 接收器。
aggregate_pipeline:
source:
http:
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: ["%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port:int}"]
- aggregate:
group_duration: "30s"
identification_keys: ["sourceIp", "destinationIp", "port"]
action:
put_all:
sink:
- opensearch:
...
index: aggregated_logs
例如,考虑以下批日志
{ "log": "127.0.0.1 192.168.0.1 80", "status": 200 }
{ "log": "127.0.0.1 192.168.0.1 80", "bytes": 1000 }
{ "log": "127.0.0.1 192.168.0.1 80" "http_verb": "GET" }
grok
处理器将提取键,使日志事件看起来像以下示例。这些事件现在包含了 aggregate
处理器为 identification_keys
所需的数据。
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "http_verb": "GET" }
30 秒后,aggregate
处理器将以下聚合日志写入接收器
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200, "bytes": 1000, "http_verb": "GET" }
删除重复项
您可以通过从传入事件中派生键并为 aggregate
处理器指定 remove_duplicates
选项来删除重复条目。此操作会立即处理组中的第一个事件,并丢弃该组中所有后续事件。
在以下示例中,第一个事件使用识别键 sourceIp
和 destinationIp
进行处理
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
然后管道将丢弃以下事件,因为它具有相同的键
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
管道处理此事件并创建一个新组,因为 sourceIp
不同
{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }
日志聚合和条件路由
您可以使用多个插件将日志聚合与条件路由结合起来。在此示例中,管道 log-aggregate-pipeline
使用 HTTP 客户端(如 FluentBit)接收日志,并通过将 log
键中的值与 Apache 通用日志格式匹配来从日志中提取重要值。
管道使用 Grok 模式从日志中提取的两个值包括 response
和 clientip
。然后,aggregate
处理器使用 clientip
值以及 remove_duplicates
选项,丢弃任何在给定 group_duration
内已处理过的包含 clientip
的日志。
管道中存在三个路由或条件语句。这些路由将响应的值分为 2xx
、3xx
、4xx
和 5xx
响应。状态为 2xx
或 3xx
的日志被发送到 aggregated_2xx_3xx
索引,状态为 4xx
的日志被发送到 aggregated_4xx index
,状态为 5xx
的日志被发送到 aggregated_5xx
索引。
log-aggregate-pipeline:
source:
http:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/log-aggregate-pipeline/logs". This will be the FluentBit output URI value.
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
- aggregate:
identification_keys: ["clientip"]
action:
remove_duplicates:
group_duration: "180s"
route:
- 2xx_status: "/response >= 200 and /response < 300"
- 3xx_status: "/response >= 300 and /response < 400"
- 4xx_status: "/response >= 400 and /response < 500"
- 5xx_status: "/response >= 500 and /response < 600"
sink:
- opensearch:
...
index: "aggregated_2xx_3xx"
routes:
- 2xx_status
- 3xx_status
- opensearch:
...
index: "aggregated_4xx"
routes:
- 4xx_status
- opensearch:
...
index: "aggregated_5xx"
routes:
- 5xx_status