Link Search Menu Expand Document Documentation Menu

kinesis

您可以使用 OpenSearch Data Prepper kinesis 源从一个或多个 Amazon Kinesis 数据流中摄取记录。

用法

以下示例管道指定 Kinesis 作为源。该管道从名为 stream1stream2 的多个 Kinesis 数据流中摄取数据,并设置 initial_position 以指示读取流记录的起始点。

version: "2"
kinesis-pipeline:
  source:
    kinesis:
      streams:
        - stream_name: "stream1"
          initial_position: "LATEST"
        - stream_name: "stream2"
          initial_position: "LATEST"
      aws:
        region: "us-west-2"
        sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role"

配置选项

kinesis 源支持以下配置选项。

选项 必需 类型 描述
aws AWS 指定 AWS 配置。请参阅 aws
确认 布尔型 当设置为 true 时,启用 kinesis 源在事件被 OpenSearch 接收器接收时接收端到端确认
streams 列表 配置 kinesis 源用于读取记录的多个 Kinesis 数据流列表。您可以配置最多四个流。请参阅
编解码器 编解码器 指定要应用的 编解码器
buffer_timeout 持续时间 设置允许写入事件到 Data Prepper 缓冲区直至超时的时间量。源在指定时间内无法写入缓冲区的任何事件都将被丢弃。默认为 1s
records_to_accumulate 整数 确定在写入缓冲区之前累积的消息数量。默认为 100
consumer_strategy 字符串 选择用于摄取 Kinesis 数据流的消费者策略。默认为 fan-out,但也可以使用 polling。如果启用 polling,则需要额外的配置。
polling polling 请参阅 polling

您可以在 streams 数组中使用以下选项。

选项 必需 类型 描述
stream_name 字符串 定义每个 Kinesis 数据流的名称。
initial_position 字符串 设置 initial_position 以确定 kinesis 源开始读取流记录的时间点。使用 LATEST 从最新记录开始,或使用 EARLIEST 从流的开头开始。默认为 LATEST
checkpoint_interval 持续时间 配置 checkpoint_interval 以定期检查 Kinesis 数据流,避免记录重复处理。默认为 PT2M
compression 字符串 指定压缩格式。要解压缩由 CloudWatch Logs 订阅过滤器添加到 Kinesis 的记录,请使用 gzip 压缩格式。

编解码器

codec 决定了 kinesis 源如何解析每个 Kinesis 流记录。为了提高效率和性能,您可以将 编解码器组合与某些处理器一起使用。

json 编解码器

json 编解码器将每一单行解析为 JSON 数组中的单个 JSON 对象,然后为数组中的每个对象创建一个 Data Prepper 事件。它可用于将嵌套的 CloudWatch 事件解析为单独的日志条目。它还支持以下配置与此编解码器一起使用。

选项 必需 类型 描述
key_name 字符串 要从中提取 JSON 数组并创建 Data Prepper 事件的输入字段名称。
include_keys(包含的键) 列表 要提取并作为附加字段添加到 Data Prepper 事件中的输入字段列表。
include_keys_metadata 列表 要提取并添加到 Data Prepper 事件元数据对象中的输入字段列表。
max_event_length 整数 JSON 编解码器读取的任何单个事件的最大大小。默认为 20,000,000 字符。

newline 编解码器

newline 编解码器将每个 Kinesis 流记录解析为单个日志事件,使其非常适合处理单行记录。它也与 parse_json 处理器很好地配合以解析每一行。

您可以使用以下选项配置 newline 编解码器。

选项 必需 类型 描述
skip_lines 整数 设置在创建事件之前要跳过的行数。您可以使用此配置来跳过常见的标题行。默认为 0
header_destination 字符串 定义一个键值,分配给流事件的标题行。如果指定此选项,则每个事件将包含一个 header_destination 字段。

polling

consumer_strategy 设置为 polling 时,kinesis 源使用基于轮询的方法从 Kinesis 数据流读取记录,而不是默认的 fan-out 方法。

选项 必需 类型 描述
max_polling_records 整数 设置单次调用从 Kinesis 获取的记录数。
idle_time_between_reads 持续时间 定义两次调用之间的空闲时间量。

aws

您可以在 aws 配置中使用以下选项。

选项 必需 类型 描述
region 字符串 设置用于凭据的 AWS 区域。默认为确定区域的标准 SDK 行为
sts_role_arn 字符串 定义要为 Amazon Kinesis Data Streams 和 Amazon DynamoDB 请求承担的 AWS 安全令牌服务 (AWS STS) 角色。默认为 null,此时使用标准 SDK 凭据行为
aws_sts_header_overrides 映射 定义 AWS 身份和访问管理 (IAM) 角色为接收器插件承担的标头覆盖映射。

公开的元数据属性

kinesis 源将以下元数据添加到每个已处理的事件中。您可以使用表达式语法 getMetadata 函数访问元数据属性。

  • stream_name:包含获取事件的 Kinesis 数据流的名称。

权限

要将 kinesis 作为源运行,需要以下最低权限:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamConsumer",
        "kinesis:DescribeStreamSummary",
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:ListShards",
        "kinesis:ListStreams",
        "kinesis:ListStreamConsumers",
        "kinesis:RegisterStreamConsumer",
        "kinesis:SubscribeToShard"
      ],
      "Resource": [
        "arn:aws:kinesis:us-east-1:{account-id}:stream/stream1",
        "arn:aws:kinesis:us-east-1:{account-id}:stream/stream2"
      ]
    },
    {
      "Sid": "allowCreateTable",
      "Effect": "Allow",
      "Action": [
        "dynamodb:CreateTable",
        "dynamodb:PutItem",
        "dynamodb:DescribeTable",
        "dynamodb:DeleteItem",
        "dynamodb:GetItem",
        "dynamodb:Scan",
        "dynamodb:UpdateItem",
        "dynamodb:Query"
      ],
      "Resource": [
        "arn:aws:dynamodb:us-east-1:{account-id}:table/kinesis-pipeline"
      ]
    }
  ]
}

kinesis 源使用 DynamoDB 表进行多个工作器之间的摄取协调,因此您需要 DynamoDB 权限。

指标

kinesis 源包含以下指标。

计数器

  • recordsProcessed:统计已处理流记录的数量。
  • recordProcessingErrors:统计流记录处理错误的数量。
  • acknowledgementSetSuccesses:统计成功添加到接收器的已处理流记录的数量。
  • acknowledgementSetFailures:统计未能添加到接收器的已处理流记录的数量。
剩余 350 字符

有问题?

想做贡献?