Kafka
kafka
缓冲区将数据缓冲到 Apache Kafka 主题中。它使用 Kafka 主题来持久化传输中的数据。
以下示例展示了如何在 HTTP 管道中运行 Kafka 缓冲区。它针对本地运行的 Kafka 集群运行。
kafka-buffer-pipeline:
source:
http:
buffer:
kafka:
bootstrap_servers: ["localhost:9092"]
encryption:
type: none
topics:
- name: my-buffer-topic
group_id: data-prepper
create_topic: true
processor:
- grok:
match:
message: [ "%{COMMONAPACHELOG}" ]
sink:
- stdout:
配置选项
将以下配置选项与 kafka
缓冲区一起使用。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
身份验证 | 否 | 身份验证 | 设置管道和 Kafka 的身份验证选项。有关更多信息,请参阅身份验证。 |
AWS | 否 | AWS | AWS 配置。有关更多信息,请参阅aws。 |
bootstrap_servers | 是 | 字符串列表 | 与 Kafka 集群建立初始连接的主机和端口。您可以使用每个代理的 IP 地址或端口号来配置多个 Kafka 代理。当使用Amazon Managed Streaming for Apache Kafka (Amazon MSK) 作为 Kafka 集群时,引导服务器信息将使用配置中提供的 Amazon Resource Name (ARN) 从 Amazon MSK 获取。 |
加密 | 否 | 加密 | 传输中加密的加密配置。有关更多信息,请参阅加密。 |
producer_properties | 否 | 生产者属性 | 可配置的 Kafka 生产者属性列表。 |
topics | 是 | 列表 | 缓冲区要使用的主题列表。每个缓冲区必须提供一个主题。 |
topic
topic
选项配置单个 Kafka 主题,并告知 kafka
缓冲区如何使用该主题。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
名称 | 是 | 字符串 | Kafka 主题的名称。 |
group_id | 是 | 字符串 | 设置 Kafka 的 group.id 选项。 |
workers | 否 | 整数 | 与每个主题关联的多线程消费者数量。默认值为 2 。最大值为 200 。 |
encryption_key | 否 | 字符串 | 一种高级加密标准 (AES) 加密密钥,用于在将数据发送到 Kafka 之前,在 OpenSearch Data Prepper 中加密和解密数据。此值必须是纯文本,或者使用 AWS Key Management Service (AWS KMS) 加密。 |
KMS | 否 | AWS KMS 密钥 | 配置后,使用 AWS KMS 密钥加密数据。有关更多信息,请参阅kms 。 |
auto_commit | 否 | 布尔型 | 当设置为 false 时,消费者偏移量将不会在后台定期提交到 Kafka。默认值为 false 。 |
commit_interval | 否 | 整数 | 当 auto_commit 设置为 true 时,设置消费者偏移量通过 Kafka 的 auto.commit.interval.ms 选项自动提交到 Kafka 的频率(以秒为单位)。默认值为 5s 。 |
session_timeout | 否 | 整数 | 在使用 Kafka 的组管理功能时,源检测客户端故障的持续时间,该功能可用于平衡数据流。默认值为 45s 。 |
auto_offset_reset | 否 | 字符串 | 通过 Kafka 的 auto.offset.reset 选项自动将偏移量重置为最早或最新的偏移量。默认值为 earliest 。 |
thread_waiting_time | 否 | 整数 | 线程等待前一个线程完成任务并通知下一个线程的时间量。Kafka 消费者 API 轮询超时值设置为此设置的一半。默认值为 5s 。 |
max_partition_fetch_bytes | 否 | 整数 | 通过 Kafka 的 max.partition.fetch.bytes 设置,设置每个分区返回数据的最大限制(以兆字节为单位)。默认值为 1mb 。 |
heart_beat_interval | 否 | 整数 | 在使用 Kafka 的组管理功能时,通过 Kafka 的 heartbeat.interval.ms 设置,消费者协调器之间心跳的预期时间量。默认值为 5s 。 |
fetch_max_wait | 否 | 整数 | 当没有足够数据满足 fetch_min_bytes 要求时,服务器阻止获取请求的最长时间,通过 Kafka 的 fetch.max.wait.ms 设置。默认值为 500ms 。 |
fetch_max_bytes | 否 | 整数 | 代理通过 Kafka 的 fetch.max.bytes 设置接受的最大记录大小。默认值为 50mb 。 |
fetch_min_bytes | 否 | 整数 | 服务器在获取请求期间返回的最小数据量,通过 Kafka 的 retry.backoff.ms 设置。默认值为 1b 。 |
retry_backoff | 否 | 整数 | 在尝试重试对给定主题分区的失败请求之前等待的时间量。默认值为 10s 。 |
max_poll_interval | 否 | 整数 | 在使用组管理时,poll() 调用之间的最大延迟,通过 Kafka 的 max.poll.interval.ms 选项。默认值为 300s 。 |
consumer_max_poll_records | 否 | 整数 | 通过 Kafka 的 max.poll.records 设置,单个 poll() 调用中返回的最大记录数。默认值为 500 。 |
max_message_bytes | 否 | 整数 | 消息的最大大小(以字节为单位)。默认值为 1 MB。 |
KMS
使用 AWS KMS 时,AWS KMS 密钥可以解密 encryption_key
,使其不以纯文本形式存储。要将 AWS KMS 与 kafka
缓冲区一起配置,请使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
key_id | 是 | 字符串 | AWS KMS 密钥的 ID。可以是完整的密钥 ARN 或密钥别名。 |
区域 | 否 | 字符串 | AWS KMS 密钥所在的 AWS 区域。 |
sts_role_arn | 否 | 字符串 | 用于访问 AWS KMS 密钥的 AWS Security Token Service (AWS STS) 角色 ARN。 |
encryption_context | 否 | 映射 | 提供后,发送到主题的消息将包含此映射作为 AWS KMS 加密上下文。 |
身份验证
authentication
对象中需要以下选项。
选项 | 类型 | 描述 |
---|---|---|
SASL | JSON 对象 | 简单身份验证和安全层 (SASL) 身份验证配置。 |
SASL
配置 SASL 身份验证时使用以下选项之一。
选项 | 类型 | 描述 |
---|---|---|
plaintext | JSON 对象 | PLAINTEXT 身份验证配置。 |
aws_msk_iam | 字符串 | Amazon MSK AWS Identity and Access Management (IAM) 配置。如果设置为 role ,则使用 aws 配置中设置的 sts_role_arn 。默认值为 default 。 |
SASL PLAINTEXT
使用SASL PLAINTEXT 协议时,需要以下选项。
选项 | 类型 | 描述 |
---|---|---|
用户名 | 字符串 | PLAINTEXT 身份验证的用户名。 |
密码 | 字符串 | PLAINTEXT 身份验证的密码。 |
加密
设置 SSL 加密时使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
类型 | 否 | 字符串 | 加密类型。使用 none 禁用加密。默认值为 ssl 。 |
不安全 | 否 | 布尔型 | 一个布尔标志,用于关闭 SSL 证书验证。如果设置为 true ,则关闭证书颁发机构 (CA) 证书验证,并发送不安全的 HTTP 请求。默认值为 false 。 |
producer_properties
使用以下配置选项配置 Kafka 生产者。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
max_request_size | 否 | 整数 | 生产者发送到 Kafka 的请求的最大大小。默认值为 1 MB。 |
AWS
为 aws
服务设置身份验证时使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
区域 | 否 | 字符串 | 用于凭证的 AWS 区域。默认为确定区域的标准 SDK 行为。 |
sts_role_arn | 否 | 字符串 | 用于向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon Simple Storage Service (Amazon S3) 发出请求时要承担的 AWS STS 角色。默认值为 null ,这将使用凭证的标准 SDK 行为。 |
MSK | 否 | JSON 对象 | Amazon MSK 配置设置。 |
MSK
在 msk
对象中使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
ARN | 是 | 字符串 | 要使用的Amazon MSK ARN。 |
broker_connection_type 否 | 字符串 | 与 Amazon MSK 代理一起使用的连接器类型,可以是 public 、single_vpc 或 multi_vpc 。默认值为 single_vpc 。 |