Link Search Menu Expand Document Documentation Menu

流式批量

2.17.0 版本引入

这是一项实验性功能,不建议在生产环境中使用。有关该功能进展的更新或如果您想留下反馈,请参阅相关的 GitHub 问题

流式批量操作允许您通过流式传输请求来添加、更新或删除多个文档,并以流式响应获取结果。与传统的 批量 API 相比,流式摄取消除了估算批处理大小(受集群在任何给定时间的运行状态影响)的需要,并在多个客户端和集群之间自然地施加反压。流式传输通过 HTTP/2 或 HTTP/1.1(使用分块传输编码)进行,具体取决于客户端和集群的功能。

默认的 HTTP 传输方法不支持流式传输。您必须安装 transport-reactor-netty4 HTTP 传输插件并将其用作默认的 HTTP 传输层。transport-reactor-netty4 插件和流式批量 API 都是实验性的。

端点

POST _bulk/stream
POST <index>/_bulk/stream

如果您在路径中指定了索引,则无需将其包含在 请求体分块 中。

OpenSearch 也接受对 _bulk/stream 路径的 PUT 请求,但我们强烈建议使用 POST。PUT 的常见用法是向给定路径添加或替换单个资源,这对于流式批量请求来说没有意义。

查询参数

下表列出了可用的查询参数。所有查询参数都是可选的。

参数 数据类型 描述
pipeline 字符串 用于预处理文档的 pipeline ID。
refresh 枚举 是否在执行索引操作后刷新受影响的分片。默认为 falsetrue 会使更改立即显示在搜索结果中,但会降低集群性能。wait_for 会等待刷新。请求返回时间会更长,但集群性能不会降低。
require_alias 布尔型 设置为 true 要求所有操作都针对索引别名而不是索引。默认为 false
路由 字符串 将请求路由到指定的分片。
timeout 时间 等待请求返回的时间。默认为 1m
type 字符串 (已弃用)未指定类型的文档的默认文档类型。默认为 _doc。我们强烈建议忽略此参数,并对所有索引使用 _doc 类型。
wait_for_active_shards 字符串 指定在 OpenSearch 处理批量请求之前必须可用的活动分片数量。默认为 1(仅主分片)。可设置为 all 或正整数。大于 1 的值需要副本。例如,如果您指定值为 3,则索引必须有 2 个副本分布在另外 2 个节点上,请求才能成功。
batch_interval 时间 指定批量操作在发送到数据节点之前应累积成批的时间。
batch_size 时间 指定批量操作在发送到数据节点之前应累积成批的数量。默认为 1

请求正文字段

流式批量 API 请求体与 批量 API 请求体 完全兼容,其中每个批量操作(创建/索引/更新/删除)都作为单独的分块发送。

请求示例

curl -X POST "https://:9200/_bulk/stream" -H "Transfer-Encoding: chunked" -H "Content-Type: application/json" -d'
{ "delete": { "_index": "movies", "_id": "tt2229499" } }
{ "index": { "_index": "movies", "_id": "tt1979320" } }
{ "title": "Rush", "year": 2013 }
{ "create": { "_index": "movies", "_id": "tt1392214" } }
{ "title": "Prisoners", "year": 2013 }
{ "update": { "_index": "movies", "_id": "tt0816711" } }
{ "doc" : { "title": "World War Z" } }
'

示例响应

根据批处理设置,每个流式响应分块可能会报告一个或多个(批量)批量操作的结果。例如,对于没有批处理(默认)的前述请求,流式响应可能如下所示

{"took": 11, "errors": false, "items": [ { "index": {"_index": "movies", "_id": "tt1979320", "_version": 1, "result": "created", "_shards": { "total": 2 "successful": 1, "failed": 0 }, "_seq_no": 1, "_primary_term": 1, "status": 201 } } ] }
{"took": 2, "errors": true, "items": [ { "create": { "_index": "movies", "_id": "tt1392214", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[tt1392214]: version conflict, document already exists (current version [1])", "index": "movies", "shard": "0", "index_uuid": "yhizhusbSWmP0G7OJnmcLg" } } } ] }
{"took": 4, "errors": true, "items": [ { "update": { "_index": "movies", "_id": "tt0816711", "status": 404, "error": { "type": "document_missing_exception", "reason": "[_doc][tt0816711]: document missing", "index": "movies", "shard": "0", "index_uuid": "yhizhusbSWmP0G7OJnmcLg" } } } ] }
剩余 350 字符

有问题?

想要贡献?