kinesis
您可以使用 OpenSearch Data Prepper kinesis
源从一个或多个 Amazon Kinesis 数据流中摄取记录。
用法
以下示例管道指定 Kinesis 作为源。该管道从名为 stream1
和 stream2
的多个 Kinesis 数据流中摄取数据,并设置 initial_position
以指示读取流记录的起始点。
version: "2"
kinesis-pipeline:
source:
kinesis:
streams:
- stream_name: "stream1"
initial_position: "LATEST"
- stream_name: "stream2"
initial_position: "LATEST"
aws:
region: "us-west-2"
sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role"
配置选项
kinesis
源支持以下配置选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
aws | 是 | AWS | 指定 AWS 配置。请参阅 aws 。 |
确认 | 否 | 布尔型 | 当设置为 true 时,启用 kinesis 源在事件被 OpenSearch 接收器接收时接收端到端确认。 |
streams | 是 | 列表 | 配置 kinesis 源用于读取记录的多个 Kinesis 数据流列表。您可以配置最多四个流。请参阅 流。 |
编解码器 | 是 | 编解码器 | 指定要应用的 编解码器。 |
buffer_timeout | 否 | 持续时间 | 设置允许写入事件到 Data Prepper 缓冲区直至超时的时间量。源在指定时间内无法写入缓冲区的任何事件都将被丢弃。默认为 1s 。 |
records_to_accumulate | 否 | 整数 | 确定在写入缓冲区之前累积的消息数量。默认为 100 。 |
consumer_strategy | 否 | 字符串 | 选择用于摄取 Kinesis 数据流的消费者策略。默认为 fan-out ,但也可以使用 polling 。如果启用 polling ,则需要额外的配置。 |
polling | 否 | polling | 请参阅 polling。 |
流
您可以在 streams
数组中使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
stream_name | 是 | 字符串 | 定义每个 Kinesis 数据流的名称。 |
initial_position | 否 | 字符串 | 设置 initial_position 以确定 kinesis 源开始读取流记录的时间点。使用 LATEST 从最新记录开始,或使用 EARLIEST 从流的开头开始。默认为 LATEST 。 |
checkpoint_interval | 否 | 持续时间 | 配置 checkpoint_interval 以定期检查 Kinesis 数据流,避免记录重复处理。默认为 PT2M 。 |
compression | 否 | 字符串 | 指定压缩格式。要解压缩由 CloudWatch Logs 订阅过滤器添加到 Kinesis 的记录,请使用 gzip 压缩格式。 |
编解码器
codec
决定了 kinesis
源如何解析每个 Kinesis 流记录。为了提高效率和性能,您可以将 编解码器组合与某些处理器一起使用。
json 编解码器
json
编解码器将每一单行解析为 JSON 数组中的单个 JSON 对象,然后为数组中的每个对象创建一个 Data Prepper 事件。它可用于将嵌套的 CloudWatch 事件解析为单独的日志条目。它还支持以下配置与此编解码器一起使用。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
key_name | 否 | 字符串 | 要从中提取 JSON 数组并创建 Data Prepper 事件的输入字段名称。 |
include_keys(包含的键) | 否 | 列表 | 要提取并作为附加字段添加到 Data Prepper 事件中的输入字段列表。 |
include_keys_metadata | 否 | 列表 | 要提取并添加到 Data Prepper 事件元数据对象中的输入字段列表。 |
max_event_length | 否 | 整数 | JSON 编解码器读取的任何单个事件的最大大小。默认为 20,000,000 字符。 |
newline
编解码器
newline
编解码器将每个 Kinesis 流记录解析为单个日志事件,使其非常适合处理单行记录。它也与 parse_json
处理器很好地配合以解析每一行。
您可以使用以下选项配置 newline
编解码器。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
skip_lines | 否 | 整数 | 设置在创建事件之前要跳过的行数。您可以使用此配置来跳过常见的标题行。默认为 0 。 |
header_destination | 否 | 字符串 | 定义一个键值,分配给流事件的标题行。如果指定此选项,则每个事件将包含一个 header_destination 字段。 |
polling
当 consumer_strategy
设置为 polling
时,kinesis
源使用基于轮询的方法从 Kinesis 数据流读取记录,而不是默认的 fan-out
方法。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
max_polling_records | 否 | 整数 | 设置单次调用从 Kinesis 获取的记录数。 |
idle_time_between_reads | 否 | 持续时间 | 定义两次调用之间的空闲时间量。 |
aws
您可以在 aws
配置中使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
region | 否 | 字符串 | 设置用于凭据的 AWS 区域。默认为确定区域的标准 SDK 行为。 |
sts_role_arn | 否 | 字符串 | 定义要为 Amazon Kinesis Data Streams 和 Amazon DynamoDB 请求承担的 AWS 安全令牌服务 (AWS STS) 角色。默认为 null ,此时使用标准 SDK 凭据行为。 |
aws_sts_header_overrides | 否 | 映射 | 定义 AWS 身份和访问管理 (IAM) 角色为接收器插件承担的标头覆盖映射。 |
公开的元数据属性
kinesis
源将以下元数据添加到每个已处理的事件中。您可以使用表达式语法 getMetadata
函数访问元数据属性。
stream_name
:包含获取事件的 Kinesis 数据流的名称。
权限
要将 kinesis
作为源运行,需要以下最低权限:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamConsumer",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:SubscribeToShard"
],
"Resource": [
"arn:aws:kinesis:us-east-1:{account-id}:stream/stream1",
"arn:aws:kinesis:us-east-1:{account-id}:stream/stream2"
]
},
{
"Sid": "allowCreateTable",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:UpdateItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/kinesis-pipeline"
]
}
]
}
kinesis
源使用 DynamoDB 表进行多个工作器之间的摄取协调,因此您需要 DynamoDB 权限。
指标
kinesis
源包含以下指标。
计数器
recordsProcessed
:统计已处理流记录的数量。recordProcessingErrors
:统计流记录处理错误的数量。acknowledgementSetSuccesses
:统计成功添加到接收器的已处理流记录的数量。acknowledgementSetFailures
:统计未能添加到接收器的已处理流记录的数量。