拉取式摄入
3.0 版本引入
这是一个实验性功能,不建议在生产环境中使用。有关该功能进展的更新或如果您想提供反馈,请在 OpenSearch 论坛上加入讨论。
拉取式摄取使 OpenSearch 能够从 Apache Kafka 或 Amazon Kinesis 等流式源摄取数据。与客户端通过 REST API 主动将数据推送到 OpenSearch 的传统摄取方法不同,拉取式摄取允许 OpenSearch 通过直接从流式源检索数据来控制数据流。这种方法提供了精确一次的摄取语义和原生反压处理,有助于防止在流量高峰期间服务器过载。
先决条件
在使用拉取式摄取之前,请确保满足以下先决条件:
- 使用命令
bin/opensearch-plugin install <plugin-name>
为您的流式源安装摄取插件。有关更多信息,请参阅附加插件。OpenSearch 支持以下摄取插件:ingestion-kafka
ingestion-kinesis
- 启用段复制和远程支持存储。拉取式摄取与文档复制不兼容。
- 在索引创建期间配置拉取式摄取。您无法将现有的推送式索引转换为拉取式索引。
为拉取式摄取创建索引
要从流式源摄取数据,首先创建一个具有拉取式摄取设置的索引。以下请求创建一个从 Kafka topic 拉取数据的索引:
PUT /my-index
{
"settings": {
"ingestion_source": {
"type": "kafka",
"pointer.init.reset": "earliest",
"param": {
"topic": "test",
"bootstrap_servers": "localhost:49353"
}
},
"index.number_of_shards": 1,
"index.number_of_replicas": 1,
"index": {
"replication.type": "SEGMENT"
}
},
"mappings": {
"properties": {
"name": {
"type": "text"
},
"age": {
"type": "integer"
}
}
}
}
摄取源参数
ingestion_source
参数控制 OpenSearch 如何从流式源拉取数据。*轮询*是 OpenSearch 主动从流式源请求一批数据的操作。下表列出了 ingestion_source
支持的所有参数。
参数 | 描述 |
---|---|
type | 流式源类型。必需。有效值为 kafka 或 kinesis 。 |
pointer.init.reset | 确定从流的哪个位置开始读取。可选。有效值为 earliest 、latest 、reset_by_offset 、reset_by_timestamp 或 none 。请参阅流位置。 |
pointer.init.reset.value | 仅在 reset_by_offset 或 reset_by_timestamp 时必需。指定偏移值或以毫秒为单位的时间戳。请参阅流位置。 |
error_strategy | 如何处理失败的消息。可选。有效值为 DROP (跳过失败的消息并继续摄取)和 BLOCK (当消息失败时,摄取停止)。默认值为 DROP 。我们建议在当前的实验性版本中使用 DROP 。 |
poll.max_batch_size | 每次轮询操作中检索的最大记录数。可选。 |
poll.timeout | 每次轮询操作中等待数据的最长时间。可选。 |
num_processor_threads | 处理摄取数据的线程数。可选。默认值为 1。 |
internal_queue_size | 用于高级调优的内部阻塞队列的大小。有效值为 1 到 100,000(包括)。可选。默认值为 100。 |
param | 源特定的配置参数。必需。 • ingest-kafka 插件需要- topic :要消费的 Kafka topic- bootstrap_servers :Kafka 服务器地址可选地,您可以提供额外的标准 Kafka 消费者参数(例如 fetch.min.bytes )。这些参数直接传递给 Kafka 消费者。• ingest-kinesis 插件需要- stream :Kinesis 流名称- region :AWS 区域- access_key :AWS 访问密钥- secret_key :AWS 秘密密钥可选地,您可以提供 endpoint_override 。 |
流位置
创建索引时,您可以通过在 ingestion_source
参数中配置 pointer.init.reset
和 pointer.init.reset.value
设置来指定 OpenSearch 应从流的何处开始读取。对于现有索引,OpenSearch 将从最后提交的位置恢复读取。
下表提供了有效的 pointer.init.reset
值及其对应的 pointer.init.reset.value
值。
pointer.init.reset | 开始摄取点 | pointer.init.reset.value |
---|---|---|
earliest | 流的开始 | 无 |
latest | 流的当前末尾 | 无 |
reset_by_offset | 流中的特定偏移量 | 一个正整数偏移量。必需。 |
reset_by_timestamp | 特定时间点 | 以毫秒为单位的 Unix 时间戳。必需。 对于 Kafka 流,如果在给定时间戳未找到消息,则默认为 Kafka 的 auto.offset.reset 策略。 |
none | 现有索引的最后提交位置 | 无 |
流分区
使用分区流(例如 Kafka topic 或 Kinesis 分片)时,请注意流分区和 OpenSearch 分片之间的以下关系:
- OpenSearch 分片与流分区一一对应。
- 索引分片的数量必须大于或等于流分区的数量。
- 超出分区数量的额外分片将保持为空。
- 文档必须发送到同一分区才能成功更新。
使用拉取式摄取时,传统基于 REST API 的摄取将对该索引禁用。
更新错误策略
您可以使用更新设置 API 通过将 index.ingestion_source.error_strategy
设置为 DROP
或 BLOCK
来动态更新错误策略。
以下示例演示了如何更新错误策略:
PUT /my-index/_settings
{
"index.ingestion_source.error_strategy": "DROP"
}
消息格式
为了由 OpenSearch 正确处理,流式源中的消息必须具有以下格式:
{"_id":"1", "_version":"1", "_source":{"name": "alice", "age": 30}, "_op_type": "index"}
{"_id":"2", "_version":"2", "_source":{"name": "alice", "age": 30}, "_op_type": "delete"}
流式源中的每个数据单元(Kafka 消息或 Kinesis 记录)必须包含以下字段,这些字段指定如何创建或修改 OpenSearch 文档。
字段 | 数据类型 | 必需 | 描述 |
---|---|---|---|
_id | 字符串 | 否 | 文档的唯一标识符。如果未提供,OpenSearch 会自动生成一个 ID。文档更新或删除时必需。 |
_version | 长整型 | 否 | 文档版本号,必须在外部维护。如果提供,OpenSearch 会丢弃版本早于当前文档版本的消息。如果未提供,则不进行版本检查。 |
_op_type | 字符串 | 否 | 要执行的操作。有效值包括: - index :创建新文档或更新现有文档。- create :以追加模式创建新文档。请注意,这不会更新现有文档。- delete :软删除文档。 |
_source | 对象 | 是 | 包含文档数据的消息负载。 |
拉取式摄取指标
拉取式摄取提供可用于监控摄取过程的指标。目前支持 polling_ingest_stats
指标,并在分片级别可用。
下表列出了可用的 polling_ingest_stats
指标。
度量 | 描述 |
---|---|
message_processor_stats.total_processed_count | 消息处理器处理的消息总数。 |
message_processor_stats.total_invalid_message_count | 遇到的无效消息数量。 |
message_processor_stats.total_version_conflicts_count | 由于版本冲突而导致旧版本消息将被丢弃的数量。 |
message_processor_stats.total_failed_count | 处理过程中出错的失败消息总数。 |
message_processor_stats.total_failures_dropped_count | 在重试耗尽后被丢弃的失败消息总数。请注意,只有在使用 DROP 错误策略时才会丢弃消息。 |
message_processor_stats.total_processor_thread_interrupt_count | 指示处理器线程上的线程中断次数。 |
consumer_stats.total_polled_count | 从流消费者轮询的消息总数。 |
consumer_stats.total_consumer_error_count | 致命消费者读取错误的总数。 |
consumer_stats.total_poller_message_failure_count | 轮询器上失败消息的总数。 |
consumer_stats.total_poller_message_dropped_count | 轮询器上被丢弃的失败消息总数。 |
consumer_stats.total_duplicate_message_skipped_count | 之前已处理而被跳过的消息总数。 |
consumer_stats.lag_in_millis | 毫秒级延迟,计算方式为自上次处理的消息时间戳以来经过的时间。 |
要检索分片级别的拉取式摄取指标,请使用节点统计 API
GET /_nodes/stats/indices?level=shards&pretty
限制
使用拉取式摄取时,存在以下限制: