Link Search Menu Expand Document Documentation Menu

kafka

您可以使用 OpenSearch Data Prepper 中的 Apache Kafka 源 (kafka) 从一个或多个 Kafka 主题读取记录。这些记录包含您的 Data Prepper 管道可以摄取(ingest)的事件。kafka 源使用 Kafka 的 Consumer API 从 Kafka 代理消费消息,然后为 Data Prepper 管道创建 Data Prepper 事件以进行进一步处理。

用法

以下示例展示了 Data Prepper 管道中的 kafka 源。

kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - 127.0.0.1:9093
      topics:
        - name: Topic1
          group_id: groupID1
        - name: Topic2
          group_id: groupID1

配置

将以下配置选项与 kafka 源一起使用。

选项 必需 类型 描述
bootstrap_servers 是的,当不使用 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 作为集群时。 IP 地址 Kafka 集群初始连接的主机或端口。您可以使用每个代理的 IP 地址或端口号配置多个 Kafka 代理。当使用 Amazon MSK 作为 Kafka 集群时,引导服务器信息将使用配置中提供的 MSK Amazon 资源名称 (ARN) 从 MSK 获取。
topics JSON 数组 Data Prepper kafka 源用于读取消息的 Kafka 主题。您可以配置最多 10 个主题。有关 topics 配置选项的更多信息,请参阅主题
schema JSON 对象 模式注册表配置。有关更多信息,请参阅模式
身份验证 JSON 对象 设置管道和 Kafka 的身份验证选项。有关更多信息,请参阅身份验证
encryption JSON 对象 加密配置。有关更多信息,请参阅加密
aws JSON 对象 AWS 配置。有关更多信息,请参阅aws
确认 布尔型 如果为 true,则当 OpenSearch 输出接收到事件时,启用 kafka 源接收端到端确认。默认值为 false
client_dns_lookup 是的,当使用 DNS 别名时。 字符串 设置 Kafka 的 client.dns.lookup 选项。默认值为 default

主题

topics 数组中使用以下选项。

选项 必需 类型 描述
名称 字符串 每个 Kafka 主题的名称。
group_id 字符串 设置 Kafka 的 group.id 选项。
workers 整数 与每个主题关联的多线程消费者数量。默认值为 2。最大值为 200
serde_format 字符串 指示主题中消息的序列化和反序列化格式。默认值为 plaintext
auto_commit 布尔型 当为 false 时,消费者的偏移量将不会在后台定期提交到 Kafka。默认值为 false
commit_interval 整数 auto_commit 设置为 true 时,设置消费者偏移量通过 Kafka 的 auto.commit.interval.ms 选项自动提交到 Kafka 的频率(以秒为单位)。默认值为 5s
session_timeout 整数 当使用 Kafka 的组管理功能(可用于平衡数据流)时,源检测客户端故障的时间量。默认值为 45s
auto_offset_reset 字符串 通过 Kafka 的 auto.offset.reset 选项自动将偏移量重置为较早或最新的偏移量。默认值为 earliest
thread_waiting_time 整数 线程等待前一个线程完成任务并发出信号给下一个线程的时间量。Kafka 消费者 API 轮询超时值设置为此设置的一半。默认值为 5s
max_partition_fetch_bytes 整数 通过 Kafka 的 max.partition.fetch.bytes 设置,设置每个分区返回最大数据的兆字节限制。默认值为 1mb
heart_beat_interval 整数 当使用 Kafka 的组管理功能时,通过 Kafka 的 heartbeat.interval.ms 设置,消费者协调器之间心跳的预期时间量。默认值为 5s
fetch_max_wait 整数 当没有足够的数据满足 fetch_min_bytes 要求时,服务器阻止获取请求的最长时间(通过 Kafka 的 fetch.max.wait.ms 设置)。默认值为 500ms
fetch_max_bytes 整数 代理通过 Kafka 的 fetch.max.bytes 设置接受的最大记录大小。默认值为 50mb
fetch_min_bytes 整数 服务器在获取请求期间返回的最小数据量(通过 Kafka 的 retry.backoff.ms 设置)。默认值为 1b
retry_backoff 整数 在尝试重试对给定主题分区的失败请求之前等待的时间量。默认值为 10s
max_poll_interval 整数 当通过 Kafka 的 max.poll.interval.ms 选项使用组管理时,poll() 调用之间的最大延迟。默认值为 300s
consumer_max_poll_records 整数 通过 Kafka 的 max.poll.records 设置,单个 poll() 调用中返回的最大记录数。默认值为 500
key_mode 字符串 指示 Kafka 消息的键字段应如何处理。默认设置为 include_as_field,它将键包含在 kafka_key 事件中。include_as_metadata 设置将键包含在事件的元数据中。discard 设置丢弃键。

模式

schema 配置中需要以下选项。

选项 类型 描述
type 字符串 根据您的注册表设置模式类型,可以是 AWS Glue Schema Registry (aws_glue) 或 Confluent Schema Registry (confluent)。当使用 aws_glue 注册表时,请设置任何 AWS 配置选项。

只有在使用 confluent 注册表时才需要以下配置选项。

选项 类型 描述
registry_url 字符串 将记录值从 bytearray 反序列化为字符串。默认值为 org.apache.kafka.common.serialization.StringDeserializer
version 字符串 将记录键从 bytearray 反序列化为字符串。默认值为 org.apache.kafka.common.serialization.StringDeserializer
schema_registry_api_key 字符串 模式注册表 API 密钥。
schema_registry_api_secret 字符串 模式注册表 API 密匙。

身份验证

authentication 对象中需要以下选项。

选项 类型 描述
sasl JSON 对象 简单身份验证和安全层 (SASL) 身份验证配置。

SASL

配置 SASL 身份验证时,请使用以下选项之一。

选项 类型 描述
plaintext JSON 对象 PLAINTEXT 身份验证配置。
aws_msk_iam 字符串 Amazon MSK AWS 身份和访问管理 (IAM) 配置。如果设置为 role,则使用 aws 配置中设置的 sts_role_arm。默认值为 default

SASL PLAINTEXT

使用 SASL PLAINTEXT 协议时,需要以下选项。

选项 类型 描述
username 字符串 PLAINTEXT 身份验证的用户名。
password 字符串 PLAINTEXT 身份验证的密码。

加密

设置 SSL 加密时,请使用以下选项。

选项 必需 类型 描述
type 字符串 加密类型。使用 none 禁用加密。默认值为 ssl
insecure 布尔型 一个布尔标志,用于关闭 SSL 证书验证。如果设置为 true,则关闭证书颁发机构 (CA) 证书验证,并发送不安全的 HTTP 请求。默认值为 false

AWS

aws 服务设置身份验证时使用以下选项。

选项 必需 类型 描述
region 字符串 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域
sts_role_arn 字符串 用于向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon Simple Storage Service (Amazon S3) 发送请求时所承担的 AWS 安全令牌服务 (AWS STS) 角色。默认值为 null,这将使用凭证的标准 SDK 行为
msk JSON 对象 MSK 配置设置。

MSK

msk 对象中使用以下选项。

选项 必需 类型 描述
arn 字符串 要使用的MSK ARN
broker_connection_type 字符串 与 MSK 代理一起使用的连接器类型,可以是 publicsingle_vpcmultip_vpc。默认值为 single_vpc  
剩余 350 字符

有问题?

想要贡献?