聚合
aggregate
处理器根据 identification_keys
的值对事件进行分组。然后,处理器对每个组执行一个操作,有助于减少不必要的日志量并随着时间推移创建聚合日志。您可以使用现有操作或使用 Java 代码创建自己的自定义聚合。
配置
下表描述了可用于配置 aggregate
处理器的选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
identification_keys | 是 | 列表 | 用于分组事件的无序列表。具有相同键值的事件将放入同一组中。如果事件不包含 identification_keys 中的任何一个,则该键的值被视为等于 null 。至少需要一个 identification_key(例如,["sourceIp", "destinationIp", "port"] )。 |
action | 是 | AggregateAction | 对每个组执行的操作。必须提供一种可用聚合操作,或者您可以创建自定义聚合操作。remove_duplicates 和 put_all 是可用操作。有关更多信息,请参阅创建新聚合操作。 |
group_duration | 否 | 字符串 | 一个组在自动结束前应该存在的时间量。支持 ISO_8601 符号字符串(“PT20.345S”、“PT15M”等)以及秒("60s" )和毫秒("1500ms" )的简单表示法。默认值为 180s 。 |
local_mode | 否 | 布尔型 | 当 local_mode 设置为 true 时,聚合会在每个 OpenSearch Data Prepper 节点上本地执行,而不是根据 identification_keys 使用哈希函数将事件转发到特定节点。默认值为 false 。 |
可用聚合操作
使用以下聚合操作来确定 aggregate
处理器如何处理每个组中的事件。
remove_duplicates
remove_duplicates
操作立即处理组中的第一个事件,并丢弃源中与第一个事件重复的任何事件。例如,当使用 identification_keys: ["sourceIp", "destination_ip"]
时
remove_duplicates
操作处理源中的第一个事件:{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
。- OpenSearch Data Prepper 丢弃事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
,因为sourceIp
和destinationIp
与源中的第一个事件匹配。 remove_duplicates
操作处理下一个事件{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }
。由于sourceIp
与组中的第一个事件不同,Data Prepper 根据此事件创建一个新组。
put_all
put_all
操作通过覆盖现有键和添加新键来合并属于同一组的事件,类似于 Java 的 Map.putAll
。该操作丢弃构成合并事件的所有事件。例如,当使用 identification_keys: ["sourceIp", "destination_ip"]
时,put_all
操作处理以下三个事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" }
然后该操作将事件合并为一个。管道将使用以下合并事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200, "bytes": 1000, "http_verb": "GET" }
count
count
事件计算属于同一组的事件,并生成一个新事件,其中包含 identification_keys
的值和计数,表示新事件的数量。您可以使用以下配置选项自定义处理器
count_key
: 用于存储计数的键。默认名称为aggr._count
。start_time_key
: 用于存储开始时间的键。默认名称为aggr._start_time
。output_format
: 聚合事件的格式。otel_metrics
: 默认输出格式。以 OTel metrics SUM 类型输出,并将计数作为值。raw
- 生成一个 JSON 对象,其中count_key
字段为计数,start_time_key
字段为聚合开始时间。
例如,当使用 identification_keys: ["sourceIp", "destination_ip"]
时,count
操作会计算并处理以下事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 503 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 400 }
处理器创建以下事件
{"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"}
直方图
histogram
操作聚合属于同一组的事件,并根据配置的 key
生成一个新事件,其中包含 identification_keys
的值和聚合事件的直方图。直方图包含事件数量、总和、桶、桶计数以及可选的与 key
对应的值的最小值和最大值。该操作丢弃构成合并事件的所有事件。
您可以使用以下配置选项自定义处理器
key
: 直方图生成的事件中字段的名称。generated_key_prefix
: 聚合事件中创建的所有字段使用的key_prefix
。拥有前缀可确保直方图事件的名称不与事件中的字段名称冲突。units
:key
中值的单位。record_minmax
: 一个布尔值,指示直方图是否应包含聚合中值的 min 和 max。buckets
: 一个桶列表(double
类型的值),指示直方图中的桶。output_format
: 聚合事件的格式。otel_metrics
: 默认输出格式。以 OTel metrics SUM 类型输出,并将计数作为值。raw
: 生成一个 JSON 对象,其中count_key
字段为计数,start_time_key
字段为聚合开始时间。
例如,当使用 identification_keys: ["sourceIp", "destination_ip", "request"]
、key: latency
和 buckets: [0.0, 0.25, 0.5]
时,histogram
操作处理以下事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.2 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.55}
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.25 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.15 }
然后处理器创建以下事件
{"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"}
rate_limiter
rate_limiter
操作控制每秒聚合的事件数量。默认情况下,如果 aggregate
处理器收到的事件数量超过允许的配置数量,rate_limiter
将阻止其运行。您可以使用 when_exceeds
配置选项覆盖触发限速的事件数量。
您可以使用以下配置选项自定义处理器
events_per_second
: 每秒允许的事件数量。when_exceeds
: 指示当接收到的事件数量大于每秒允许的事件数量时,rate_limiter
采取什么操作。默认值为block
,它会在达到每秒允许的最大事件数量后阻止处理器运行,直到下一秒。或者,drop
选项会丢弃该秒内收到的多余事件。
例如,如果 events_per_second
设置为 1
且 when_exceeds
设置为 drop
,则在一秒时间间隔内收到以下事件时,该操作会尝试处理它们
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" }
以下事件被处理,但所有其他事件都被忽略,因为 rate_limiter
阻止了它们
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
如果 when_exceeds
设置为 drop
,则所有三个事件都会被处理。
percent_sampler
percent_sampler
操作根据事件的百分比控制聚合的事件数量。该操作会丢弃不包含在该百分比中的任何事件。
您可以使用 percent
配置来设置事件的百分比,该配置指示在一秒间隔内处理的事件百分比 (0%–100%)。
例如,如果 percent 设置为 50
,则在一秒间隔内,该操作会尝试处理以下事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
管道处理 50% 的事件,丢弃其他事件,并且不生成新事件
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }
指标
下表描述了常见的抽象处理器指标。
指标名称 | 类型 | 描述 |
---|---|---|
recordsIn | 计数器 | 表示记录进入管道组件的指标。 |
recordsOut | 计数器 | 表示记录从管道组件流出的指标。 |
timeElapsed | 计时器 | 表示管道组件执行期间经过时间的指标。 |
aggregate
处理器包含以下自定义指标。
计数器
actionHandleEventsOut
: 从handleEvent
调用返回到配置的操作的事件数量。actionHandleEventsDropped
: 未从handleEvent
调用返回到配置的操作的事件数量。actionHandleEventsProcessingErrors
: 对配置的操作进行handleEvent
调用并导致错误的次数。actionConcludeGroupEventsOut
: 从concludeGroup
调用返回到配置的操作的事件数量。actionConcludeGroupEventsDropped
: 未从concludeGroup
调用返回到配置的操作的事件数量。actionConcludeGroupEventsProcessingErrors
: 对配置的操作进行concludeGroup
调用并导致错误的次数。
仪表
currentAggregateGroups
: 此仪表表示当前活动的聚合组数量。当聚合组完成并发出其结果时,此值会减少;当新事件启动新聚合组的创建时,此值会增加。