Link Search Menu Expand Document Documentation Menu

拉取式摄入

3.0 版本引入

这是一个实验性功能,不建议在生产环境中使用。有关该功能进展的更新或如果您想提供反馈,请在 OpenSearch 论坛上加入讨论。

拉取式摄取使 OpenSearch 能够从 Apache Kafka 或 Amazon Kinesis 等流式源摄取数据。与客户端通过 REST API 主动将数据推送到 OpenSearch 的传统摄取方法不同,拉取式摄取允许 OpenSearch 通过直接从流式源检索数据来控制数据流。这种方法提供了精确一次的摄取语义和原生反压处理,有助于防止在流量高峰期间服务器过载。

先决条件

在使用拉取式摄取之前,请确保满足以下先决条件:

  • 使用命令 bin/opensearch-plugin install <plugin-name> 为您的流式源安装摄取插件。有关更多信息,请参阅附加插件。OpenSearch 支持以下摄取插件:
    • ingestion-kafka
    • ingestion-kinesis
  • 启用段复制远程支持存储。拉取式摄取与文档复制不兼容。
  • 索引创建期间配置拉取式摄取。您无法将现有的推送式索引转换为拉取式索引。

为拉取式摄取创建索引

要从流式源摄取数据,首先创建一个具有拉取式摄取设置的索引。以下请求创建一个从 Kafka topic 拉取数据的索引:

PUT /my-index
{
  "settings": {
    "ingestion_source": {
      "type": "kafka",
      "pointer.init.reset": "earliest",
      "param": {
        "topic": "test",
        "bootstrap_servers": "localhost:49353"
      }
    },
    "index.number_of_shards": 1,
    "index.number_of_replicas": 1,
    "index": {
      "replication.type": "SEGMENT"
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text"
      },
      "age": {
        "type": "integer"
      }
    }
  }
}

摄取源参数

ingestion_source 参数控制 OpenSearch 如何从流式源拉取数据。*轮询*是 OpenSearch 主动从流式源请求一批数据的操作。下表列出了 ingestion_source 支持的所有参数。

参数 描述
type 流式源类型。必需。有效值为 kafkakinesis
pointer.init.reset 确定从流的哪个位置开始读取。可选。有效值为 earliestlatestreset_by_offsetreset_by_timestampnone。请参阅流位置
pointer.init.reset.value 仅在 reset_by_offsetreset_by_timestamp 时必需。指定偏移值或以毫秒为单位的时间戳。请参阅流位置
error_strategy 如何处理失败的消息。可选。有效值为 DROP(跳过失败的消息并继续摄取)和 BLOCK(当消息失败时,摄取停止)。默认值为 DROP。我们建议在当前的实验性版本中使用 DROP
poll.max_batch_size 每次轮询操作中检索的最大记录数。可选。
poll.timeout 每次轮询操作中等待数据的最长时间。可选。
num_processor_threads 处理摄取数据的线程数。可选。默认值为 1。
internal_queue_size 用于高级调优的内部阻塞队列的大小。有效值为 1 到 100,000(包括)。可选。默认值为 100。
param 源特定的配置参数。必需。
 • ingest-kafka 插件需要
  - topic:要消费的 Kafka topic
  - bootstrap_servers:Kafka 服务器地址
  可选地,您可以提供额外的标准 Kafka 消费者参数(例如 fetch.min.bytes)。这些参数直接传递给 Kafka 消费者。
 • ingest-kinesis 插件需要
  - stream:Kinesis 流名称
  - region:AWS 区域
  - access_key:AWS 访问密钥
  - secret_key:AWS 秘密密钥
  可选地,您可以提供 endpoint_override

流位置

创建索引时,您可以通过在 ingestion_source 参数中配置 pointer.init.resetpointer.init.reset.value 设置来指定 OpenSearch 应从流的何处开始读取。对于现有索引,OpenSearch 将从最后提交的位置恢复读取。

下表提供了有效的 pointer.init.reset 值及其对应的 pointer.init.reset.value 值。

pointer.init.reset 开始摄取点 pointer.init.reset.value
earliest 流的开始
latest 流的当前末尾
reset_by_offset 流中的特定偏移量 一个正整数偏移量。必需。
reset_by_timestamp 特定时间点 以毫秒为单位的 Unix 时间戳。必需。
对于 Kafka 流,如果在给定时间戳未找到消息,则默认为 Kafka 的 auto.offset.reset 策略。
none 现有索引的最后提交位置

流分区

使用分区流(例如 Kafka topic 或 Kinesis 分片)时,请注意流分区和 OpenSearch 分片之间的以下关系:

  • OpenSearch 分片与流分区一一对应。
  • 索引分片的数量必须大于或等于流分区的数量。
  • 超出分区数量的额外分片将保持为空。
  • 文档必须发送到同一分区才能成功更新。

使用拉取式摄取时,传统基于 REST API 的摄取将对该索引禁用。

更新错误策略

您可以使用更新设置 API 通过将 index.ingestion_source.error_strategy 设置为 DROPBLOCK 来动态更新错误策略。

以下示例演示了如何更新错误策略:

PUT /my-index/_settings
{
  "index.ingestion_source.error_strategy": "DROP"
}

消息格式

为了由 OpenSearch 正确处理,流式源中的消息必须具有以下格式:

{"_id":"1", "_version":"1", "_source":{"name": "alice", "age": 30}, "_op_type": "index"}
{"_id":"2", "_version":"2", "_source":{"name": "alice", "age": 30}, "_op_type": "delete"}

流式源中的每个数据单元(Kafka 消息或 Kinesis 记录)必须包含以下字段,这些字段指定如何创建或修改 OpenSearch 文档。

字段 数据类型 必需 描述
_id 字符串 文档的唯一标识符。如果未提供,OpenSearch 会自动生成一个 ID。文档更新或删除时必需。
_version 长整型 文档版本号,必须在外部维护。如果提供,OpenSearch 会丢弃版本早于当前文档版本的消息。如果未提供,则不进行版本检查。
_op_type 字符串 要执行的操作。有效值包括:
- index:创建新文档或更新现有文档。
- create:以追加模式创建新文档。请注意,这不会更新现有文档。
- delete:软删除文档。
_source 对象 包含文档数据的消息负载。

拉取式摄取指标

拉取式摄取提供可用于监控摄取过程的指标。目前支持 polling_ingest_stats 指标,并在分片级别可用。

下表列出了可用的 polling_ingest_stats 指标。

度量 描述
message_processor_stats.total_processed_count 消息处理器处理的消息总数。
message_processor_stats.total_invalid_message_count 遇到的无效消息数量。
message_processor_stats.total_version_conflicts_count 由于版本冲突而导致旧版本消息将被丢弃的数量。
message_processor_stats.total_failed_count 处理过程中出错的失败消息总数。
message_processor_stats.total_failures_dropped_count 在重试耗尽后被丢弃的失败消息总数。请注意,只有在使用 DROP 错误策略时才会丢弃消息。
message_processor_stats.total_processor_thread_interrupt_count 指示处理器线程上的线程中断次数。
consumer_stats.total_polled_count 从流消费者轮询的消息总数。
consumer_stats.total_consumer_error_count 致命消费者读取错误的总数。
consumer_stats.total_poller_message_failure_count 轮询器上失败消息的总数。
consumer_stats.total_poller_message_dropped_count 轮询器上被丢弃的失败消息总数。
consumer_stats.total_duplicate_message_skipped_count 之前已处理而被跳过的消息总数。
consumer_stats.lag_in_millis 毫秒级延迟,计算方式为自上次处理的消息时间戳以来经过的时间。

要检索分片级别的拉取式摄取指标,请使用节点统计 API

GET /_nodes/stats/indices?level=shards&pretty

限制

使用拉取式摄取时,存在以下限制:


相关文章

剩余 350 字符

有问题?

想要贡献?