Link Search Menu Expand Document Documentation Menu

聚合

aggregate 处理器根据 identification_keys 的值对事件进行分组。然后,处理器对每个组执行一个操作,有助于减少不必要的日志量并随着时间推移创建聚合日志。您可以使用现有操作或使用 Java 代码创建自己的自定义聚合。

配置

下表描述了可用于配置 aggregate 处理器的选项。

选项 必需 类型 描述
identification_keys 列表 用于分组事件的无序列表。具有相同键值的事件将放入同一组中。如果事件不包含 identification_keys 中的任何一个,则该键的值被视为等于 null。至少需要一个 identification_key(例如,["sourceIp", "destinationIp", "port"])。
action AggregateAction 对每个组执行的操作。必须提供一种可用聚合操作,或者您可以创建自定义聚合操作。remove_duplicatesput_all 是可用操作。有关更多信息,请参阅创建新聚合操作
group_duration 字符串 一个组在自动结束前应该存在的时间量。支持 ISO_8601 符号字符串(“PT20.345S”、“PT15M”等)以及秒("60s")和毫秒("1500ms")的简单表示法。默认值为 180s
local_mode 布尔型 local_mode 设置为 true 时,聚合会在每个 OpenSearch Data Prepper 节点上本地执行,而不是根据 identification_keys 使用哈希函数将事件转发到特定节点。默认值为 false

可用聚合操作

使用以下聚合操作来确定 aggregate 处理器如何处理每个组中的事件。

remove_duplicates

remove_duplicates 操作立即处理组中的第一个事件,并丢弃源中与第一个事件重复的任何事件。例如,当使用 identification_keys: ["sourceIp", "destination_ip"]

  1. remove_duplicates 操作处理源中的第一个事件:{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
  2. OpenSearch Data Prepper 丢弃事件 { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 },因为 sourceIpdestinationIp 与源中的第一个事件匹配。
  3. remove_duplicates 操作处理下一个事件 { "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }。由于 sourceIp 与组中的第一个事件不同,Data Prepper 根据此事件创建一个新组。

put_all

put_all 操作通过覆盖现有键和添加新键来合并属于同一组的事件,类似于 Java 的 Map.putAll。该操作丢弃构成合并事件的所有事件。例如,当使用 identification_keys: ["sourceIp", "destination_ip"] 时,put_all 操作处理以下三个事件

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" }

然后该操作将事件合并为一个。管道将使用以下合并事件

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200, "bytes": 1000, "http_verb": "GET" }

count

count 事件计算属于同一组的事件,并生成一个新事件,其中包含 identification_keys 的值和计数,表示新事件的数量。您可以使用以下配置选项自定义处理器

  • count_key: 用于存储计数的键。默认名称为 aggr._count
  • start_time_key: 用于存储开始时间的键。默认名称为 aggr._start_time
  • output_format: 聚合事件的格式。
    • otel_metrics: 默认输出格式。以 OTel metrics SUM 类型输出,并将计数作为值。
    • raw - 生成一个 JSON 对象,其中 count_key 字段为计数,start_time_key 字段为聚合开始时间。

例如,当使用 identification_keys: ["sourceIp", "destination_ip"] 时,count 操作会计算并处理以下事件

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 503 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 400 }

处理器创建以下事件

{"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"}

直方图

histogram 操作聚合属于同一组的事件,并根据配置的 key 生成一个新事件,其中包含 identification_keys 的值和聚合事件的直方图。直方图包含事件数量、总和、桶、桶计数以及可选的与 key 对应的值的最小值和最大值。该操作丢弃构成合并事件的所有事件。

您可以使用以下配置选项自定义处理器

  • key: 直方图生成的事件中字段的名称。
  • generated_key_prefix: 聚合事件中创建的所有字段使用的 key_prefix。拥有前缀可确保直方图事件的名称不与事件中的字段名称冲突。
  • units: key 中值的单位。
  • record_minmax: 一个布尔值,指示直方图是否应包含聚合中值的 min 和 max。
  • buckets: 一个桶列表(double 类型的值),指示直方图中的桶。
  • output_format: 聚合事件的格式。
    • otel_metrics: 默认输出格式。以 OTel metrics SUM 类型输出,并将计数作为值。
    • raw: 生成一个 JSON 对象,其中 count_key 字段为计数,start_time_key 字段为聚合开始时间。

例如,当使用 identification_keys: ["sourceIp", "destination_ip", "request"]key: latencybuckets: [0.0, 0.25, 0.5] 时,histogram 操作处理以下事件

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.2 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.55}
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.25 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "request" : "/index.html", "latency": 0.15 }

然后处理器创建以下事件

{"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"}

rate_limiter

rate_limiter 操作控制每秒聚合的事件数量。默认情况下,如果 aggregate 处理器收到的事件数量超过允许的配置数量,rate_limiter 将阻止其运行。您可以使用 when_exceeds 配置选项覆盖触发限速的事件数量。

您可以使用以下配置选项自定义处理器

  • events_per_second: 每秒允许的事件数量。
  • when_exceeds: 指示当接收到的事件数量大于每秒允许的事件数量时,rate_limiter 采取什么操作。默认值为 block,它会在达到每秒允许的最大事件数量后阻止处理器运行,直到下一秒。或者,drop 选项会丢弃该秒内收到的多余事件。

例如,如果 events_per_second 设置为 1when_exceeds 设置为 drop,则在一秒时间间隔内收到以下事件时,该操作会尝试处理它们

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "http_verb": "GET" }

以下事件被处理,但所有其他事件都被忽略,因为 rate_limiter 阻止了它们

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }

如果 when_exceeds 设置为 drop,则所有三个事件都会被处理。

percent_sampler

percent_sampler 操作根据事件的百分比控制聚合的事件数量。该操作会丢弃不包含在该百分比中的任何事件。

您可以使用 percent 配置来设置事件的百分比,该配置指示在一秒间隔内处理的事件百分比 (0%–100%)。

例如,如果 percent 设置为 50,则在一秒间隔内,该操作会尝试处理以下事件

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 2500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }

管道处理 50% 的事件,丢弃其他事件,并且不生成新事件

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 500 }
{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 3100 }

指标

下表描述了常见的抽象处理器指标。

指标名称 类型 描述
recordsIn 计数器 表示记录进入管道组件的指标。
recordsOut 计数器 表示记录从管道组件流出的指标。
timeElapsed 计时器 表示管道组件执行期间经过时间的指标。

aggregate 处理器包含以下自定义指标。

计数器

  • actionHandleEventsOut: 从 handleEvent 调用返回到配置的操作的事件数量。
  • actionHandleEventsDropped: 未从 handleEvent 调用返回到配置的操作的事件数量。
  • actionHandleEventsProcessingErrors: 对配置的操作进行 handleEvent 调用并导致错误的次数。
  • actionConcludeGroupEventsOut: 从 concludeGroup 调用返回到配置的操作的事件数量。
  • actionConcludeGroupEventsDropped: 未从 concludeGroup 调用返回到配置的操作的事件数量。
  • actionConcludeGroupEventsProcessingErrors: 对配置的操作进行 concludeGroup 调用并导致错误的次数。

仪表

  • currentAggregateGroups: 此仪表表示当前活动的聚合组数量。当聚合组完成并发出其结果时,此值会减少;当新事件启动新聚合组的创建时,此值会增加。
剩余 350 字符

有问题?

想贡献吗?