Link Search Menu Expand Document Documentation Menu

Kafka

kafka 缓冲区将数据缓冲到 Apache Kafka 主题中。它使用 Kafka 主题来持久化传输中的数据。

以下示例展示了如何在 HTTP 管道中运行 Kafka 缓冲区。它针对本地运行的 Kafka 集群运行。

kafka-buffer-pipeline:
  source:
    http:
  buffer:
    kafka:
      bootstrap_servers: ["localhost:9092"]
      encryption:
        type: none
      topics:
        - name: my-buffer-topic
          group_id: data-prepper
          create_topic: true
  processor:
    - grok:
        match:
          message: [ "%{COMMONAPACHELOG}" ]
  sink:
    - stdout:

配置选项

将以下配置选项与 kafka 缓冲区一起使用。

选项 必需 类型 描述
身份验证 身份验证 设置管道和 Kafka 的身份验证选项。有关更多信息,请参阅身份验证
AWS AWS AWS 配置。有关更多信息,请参阅aws
bootstrap_servers 字符串列表 与 Kafka 集群建立初始连接的主机和端口。您可以使用每个代理的 IP 地址或端口号来配置多个 Kafka 代理。当使用Amazon Managed Streaming for Apache Kafka (Amazon MSK) 作为 Kafka 集群时,引导服务器信息将使用配置中提供的 Amazon Resource Name (ARN) 从 Amazon MSK 获取。
加密 加密 传输中加密的加密配置。有关更多信息,请参阅加密
producer_properties 生产者属性 可配置的 Kafka 生产者属性列表。
topics 列表 缓冲区要使用的主题列表。每个缓冲区必须提供一个主题。

topic

topic 选项配置单个 Kafka 主题,并告知 kafka 缓冲区如何使用该主题。

选项 必需 类型 描述
名称 字符串 Kafka 主题的名称。
group_id 字符串 设置 Kafka 的 group.id 选项。
workers 整数 与每个主题关联的多线程消费者数量。默认值为 2。最大值为 200
encryption_key 字符串 一种高级加密标准 (AES) 加密密钥,用于在将数据发送到 Kafka 之前,在 OpenSearch Data Prepper 中加密和解密数据。此值必须是纯文本,或者使用 AWS Key Management Service (AWS KMS) 加密。
KMS AWS KMS 密钥 配置后,使用 AWS KMS 密钥加密数据。有关更多信息,请参阅kms
auto_commit 布尔型 当设置为 false 时,消费者偏移量将不会在后台定期提交到 Kafka。默认值为 false
commit_interval 整数 auto_commit 设置为 true 时,设置消费者偏移量通过 Kafka 的 auto.commit.interval.ms 选项自动提交到 Kafka 的频率(以秒为单位)。默认值为 5s
session_timeout 整数 在使用 Kafka 的组管理功能时,源检测客户端故障的持续时间,该功能可用于平衡数据流。默认值为 45s
auto_offset_reset 字符串 通过 Kafka 的 auto.offset.reset 选项自动将偏移量重置为最早或最新的偏移量。默认值为 earliest
thread_waiting_time 整数 线程等待前一个线程完成任务并通知下一个线程的时间量。Kafka 消费者 API 轮询超时值设置为此设置的一半。默认值为 5s
max_partition_fetch_bytes 整数 通过 Kafka 的 max.partition.fetch.bytes 设置,设置每个分区返回数据的最大限制(以兆字节为单位)。默认值为 1mb
heart_beat_interval 整数 在使用 Kafka 的组管理功能时,通过 Kafka 的 heartbeat.interval.ms 设置,消费者协调器之间心跳的预期时间量。默认值为 5s
fetch_max_wait 整数 当没有足够数据满足 fetch_min_bytes 要求时,服务器阻止获取请求的最长时间,通过 Kafka 的 fetch.max.wait.ms 设置。默认值为 500ms
fetch_max_bytes 整数 代理通过 Kafka 的 fetch.max.bytes 设置接受的最大记录大小。默认值为 50mb
fetch_min_bytes 整数 服务器在获取请求期间返回的最小数据量,通过 Kafka 的 retry.backoff.ms 设置。默认值为 1b
retry_backoff 整数 在尝试重试对给定主题分区的失败请求之前等待的时间量。默认值为 10s
max_poll_interval 整数 在使用组管理时,poll() 调用之间的最大延迟,通过 Kafka 的 max.poll.interval.ms 选项。默认值为 300s
consumer_max_poll_records 整数 通过 Kafka 的 max.poll.records 设置,单个 poll() 调用中返回的最大记录数。默认值为 500
max_message_bytes 整数 消息的最大大小(以字节为单位)。默认值为 1 MB。

KMS

使用 AWS KMS 时,AWS KMS 密钥可以解密 encryption_key,使其不以纯文本形式存储。要将 AWS KMS 与 kafka 缓冲区一起配置,请使用以下选项。

选项 必需 类型 描述
key_id 字符串 AWS KMS 密钥的 ID。可以是完整的密钥 ARN 或密钥别名。
区域 字符串 AWS KMS 密钥所在的 AWS 区域。
sts_role_arn 字符串 用于访问 AWS KMS 密钥的 AWS Security Token Service (AWS STS) 角色 ARN。
encryption_context 映射 提供后,发送到主题的消息将包含此映射作为 AWS KMS 加密上下文。

身份验证

authentication 对象中需要以下选项。

选项 类型 描述
SASL JSON 对象 简单身份验证和安全层 (SASL) 身份验证配置。

SASL

配置 SASL 身份验证时使用以下选项之一。

选项 类型 描述
plaintext JSON 对象 PLAINTEXT 身份验证配置。
aws_msk_iam 字符串 Amazon MSK AWS Identity and Access Management (IAM) 配置。如果设置为 role,则使用 aws 配置中设置的 sts_role_arn。默认值为 default

SASL PLAINTEXT

使用SASL PLAINTEXT 协议时,需要以下选项。

选项 类型 描述
用户名 字符串 PLAINTEXT 身份验证的用户名。
密码 字符串 PLAINTEXT 身份验证的密码。

加密

设置 SSL 加密时使用以下选项。

选项 必需 类型 描述
类型 字符串 加密类型。使用 none 禁用加密。默认值为 ssl
不安全 布尔型 一个布尔标志,用于关闭 SSL 证书验证。如果设置为 true,则关闭证书颁发机构 (CA) 证书验证,并发送不安全的 HTTP 请求。默认值为 false

producer_properties

使用以下配置选项配置 Kafka 生产者。

选项 必需 类型 描述
max_request_size 整数 生产者发送到 Kafka 的请求的最大大小。默认值为 1 MB。

AWS

aws 服务设置身份验证时使用以下选项。

选项 必需 类型 描述
区域 字符串 用于凭证的 AWS 区域。默认为确定区域的标准 SDK 行为
sts_role_arn 字符串 用于向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon Simple Storage Service (Amazon S3) 发出请求时要承担的 AWS STS 角色。默认值为 null,这将使用凭证的标准 SDK 行为
MSK JSON 对象 Amazon MSK 配置设置。

MSK

msk 对象中使用以下选项。

选项 必需 类型 描述
ARN 字符串 要使用的Amazon MSK ARN
broker_connection_type 字符串 与 Amazon MSK 代理一起使用的连接器类型,可以是 publicsingle_vpcmulti_vpc。默认值为 single_vpc  
剩余 350 字符

有问题?

想贡献?