kafka
您可以使用 OpenSearch Data Prepper 中的 Apache Kafka 源 (kafka
) 从一个或多个 Kafka 主题读取记录。这些记录包含您的 Data Prepper 管道可以摄取(ingest)的事件。kafka
源使用 Kafka 的 Consumer API 从 Kafka 代理消费消息,然后为 Data Prepper 管道创建 Data Prepper 事件以进行进一步处理。
用法
以下示例展示了 Data Prepper 管道中的 kafka
源。
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- 127.0.0.1:9093
topics:
- name: Topic1
group_id: groupID1
- name: Topic2
group_id: groupID1
配置
将以下配置选项与 kafka
源一起使用。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
bootstrap_servers | 是的,当不使用 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 作为集群时。 | IP 地址 | Kafka 集群初始连接的主机或端口。您可以使用每个代理的 IP 地址或端口号配置多个 Kafka 代理。当使用 Amazon MSK 作为 Kafka 集群时,引导服务器信息将使用配置中提供的 MSK Amazon 资源名称 (ARN) 从 MSK 获取。 |
topics | 是 | JSON 数组 | Data Prepper kafka 源用于读取消息的 Kafka 主题。您可以配置最多 10 个主题。有关 topics 配置选项的更多信息,请参阅主题。 |
schema | 否 | JSON 对象 | 模式注册表配置。有关更多信息,请参阅模式。 |
身份验证 | 否 | JSON 对象 | 设置管道和 Kafka 的身份验证选项。有关更多信息,请参阅身份验证。 |
encryption | 否 | JSON 对象 | 加密配置。有关更多信息,请参阅加密。 |
aws | 否 | JSON 对象 | AWS 配置。有关更多信息,请参阅aws。 |
确认 | 否 | 布尔型 | 如果为 true ,则当 OpenSearch 输出接收到事件时,启用 kafka 源接收端到端确认。默认值为 false 。 |
client_dns_lookup | 是的,当使用 DNS 别名时。 | 字符串 | 设置 Kafka 的 client.dns.lookup 选项。默认值为 default 。 |
主题
在 topics
数组中使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
名称 | 是 | 字符串 | 每个 Kafka 主题的名称。 |
group_id | 是 | 字符串 | 设置 Kafka 的 group.id 选项。 |
workers | 否 | 整数 | 与每个主题关联的多线程消费者数量。默认值为 2 。最大值为 200 。 |
serde_format | 否 | 字符串 | 指示主题中消息的序列化和反序列化格式。默认值为 plaintext 。 |
auto_commit | 否 | 布尔型 | 当为 false 时,消费者的偏移量将不会在后台定期提交到 Kafka。默认值为 false 。 |
commit_interval | 否 | 整数 | 当 auto_commit 设置为 true 时,设置消费者偏移量通过 Kafka 的 auto.commit.interval.ms 选项自动提交到 Kafka 的频率(以秒为单位)。默认值为 5s 。 |
session_timeout | 否 | 整数 | 当使用 Kafka 的组管理功能(可用于平衡数据流)时,源检测客户端故障的时间量。默认值为 45s 。 |
auto_offset_reset | 否 | 字符串 | 通过 Kafka 的 auto.offset.reset 选项自动将偏移量重置为较早或最新的偏移量。默认值为 earliest 。 |
thread_waiting_time | 否 | 整数 | 线程等待前一个线程完成任务并发出信号给下一个线程的时间量。Kafka 消费者 API 轮询超时值设置为此设置的一半。默认值为 5s 。 |
max_partition_fetch_bytes | 否 | 整数 | 通过 Kafka 的 max.partition.fetch.bytes 设置,设置每个分区返回最大数据的兆字节限制。默认值为 1mb 。 |
heart_beat_interval | 否 | 整数 | 当使用 Kafka 的组管理功能时,通过 Kafka 的 heartbeat.interval.ms 设置,消费者协调器之间心跳的预期时间量。默认值为 5s 。 |
fetch_max_wait | 否 | 整数 | 当没有足够的数据满足 fetch_min_bytes 要求时,服务器阻止获取请求的最长时间(通过 Kafka 的 fetch.max.wait.ms 设置)。默认值为 500ms 。 |
fetch_max_bytes | 否 | 整数 | 代理通过 Kafka 的 fetch.max.bytes 设置接受的最大记录大小。默认值为 50mb 。 |
fetch_min_bytes | 否 | 整数 | 服务器在获取请求期间返回的最小数据量(通过 Kafka 的 retry.backoff.ms 设置)。默认值为 1b 。 |
retry_backoff | 否 | 整数 | 在尝试重试对给定主题分区的失败请求之前等待的时间量。默认值为 10s 。 |
max_poll_interval | 否 | 整数 | 当通过 Kafka 的 max.poll.interval.ms 选项使用组管理时,poll() 调用之间的最大延迟。默认值为 300s 。 |
consumer_max_poll_records | 否 | 整数 | 通过 Kafka 的 max.poll.records 设置,单个 poll() 调用中返回的最大记录数。默认值为 500 。 |
key_mode | 否 | 字符串 | 指示 Kafka 消息的键字段应如何处理。默认设置为 include_as_field ,它将键包含在 kafka_key 事件中。include_as_metadata 设置将键包含在事件的元数据中。discard 设置丢弃键。 |
模式
schema
配置中需要以下选项。
选项 | 类型 | 描述 |
---|---|---|
type | 字符串 | 根据您的注册表设置模式类型,可以是 AWS Glue Schema Registry (aws_glue ) 或 Confluent Schema Registry (confluent )。当使用 aws_glue 注册表时,请设置任何 AWS 配置选项。 |
只有在使用 confluent
注册表时才需要以下配置选项。
选项 | 类型 | 描述 |
---|---|---|
registry_url | 字符串 | 将记录值从 bytearray 反序列化为字符串。默认值为 org.apache.kafka.common.serialization.StringDeserializer 。 |
version | 字符串 | 将记录键从 bytearray 反序列化为字符串。默认值为 org.apache.kafka.common.serialization.StringDeserializer 。 |
schema_registry_api_key | 字符串 | 模式注册表 API 密钥。 |
schema_registry_api_secret | 字符串 | 模式注册表 API 密匙。 |
身份验证
authentication
对象中需要以下选项。
选项 | 类型 | 描述 |
---|---|---|
sasl | JSON 对象 | 简单身份验证和安全层 (SASL) 身份验证配置。 |
SASL
配置 SASL 身份验证时,请使用以下选项之一。
选项 | 类型 | 描述 |
---|---|---|
plaintext | JSON 对象 | PLAINTEXT 身份验证配置。 |
aws_msk_iam | 字符串 | Amazon MSK AWS 身份和访问管理 (IAM) 配置。如果设置为 role ,则使用 aws 配置中设置的 sts_role_arm 。默认值为 default 。 |
SASL PLAINTEXT
使用 SASL PLAINTEXT 协议时,需要以下选项。
选项 | 类型 | 描述 |
---|---|---|
username | 字符串 | PLAINTEXT 身份验证的用户名。 |
password | 字符串 | PLAINTEXT 身份验证的密码。 |
加密
设置 SSL 加密时,请使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
type | 否 | 字符串 | 加密类型。使用 none 禁用加密。默认值为 ssl 。 |
insecure | 否 | 布尔型 | 一个布尔标志,用于关闭 SSL 证书验证。如果设置为 true ,则关闭证书颁发机构 (CA) 证书验证,并发送不安全的 HTTP 请求。默认值为 false 。 |
AWS
为 aws
服务设置身份验证时使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
region | 否 | 字符串 | 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域。 |
sts_role_arn | 否 | 字符串 | 用于向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon Simple Storage Service (Amazon S3) 发送请求时所承担的 AWS 安全令牌服务 (AWS STS) 角色。默认值为 null ,这将使用凭证的标准 SDK 行为。 |
msk | 否 | JSON 对象 | MSK 配置设置。 |
MSK
在 msk
对象中使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
arn | 是 | 字符串 | 要使用的MSK ARN。 |
broker_connection_type 否 | 字符串 | 与 MSK 代理一起使用的连接器类型,可以是 public 、single_vpc 或 multip_vpc 。默认值为 single_vpc 。 |