批量(gRPC)
3.0 版本引入
这是一项实验性功能,不建议在生产环境中使用。有关该功能进度的更新或如果您想留下反馈,请参阅相关的 GitHub issue。
gRPC 批量 API 提供了一种高效、二进制编码的替代方案,用于执行单个调用中的多个文档操作(例如索引、更新和删除),以替代 HTTP 批量 API。此服务使用协议缓冲区,并在参数和结构方面与 REST API 保持一致。
先决条件
要提交 gRPC 请求,您必须在客户端拥有一组 protobufs。有关如何获取 protobufs 的信息,请参阅 使用 gRPC API。
gRPC 服务和方法
gRPC 文档 API 位于 DocumentService 中。
您可以通过在 DocumentService
中调用 Bulk
gRPC 方法来提交批量请求。该方法接收一个 BulkRequest
并返回一个 BulkResponse
。
文档格式
在 gRPC 中,文档必须以字节形式提供和返回。请使用 Base64 编码在 gRPC 请求中提供文档。
例如,常规批量 API 请求中的以下文档
"doc": "{\"title\": \"Inception\", \"year\": 2010}"
对于 gRPC 批量 API 请求,请以 Base64 编码提供相同的文档
"doc": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"
BulkRequest 字段
BulkRequest
消息是 gRPC 批量操作的顶级容器。它接受以下字段。
字段 | Protobuf 类型 | 描述 |
---|---|---|
request_body | repeated BulkRequestBody | 批量操作列表(index /create /update /delete )。必填。 |
index | 字符串 | 所有操作的默认索引,除非在 request_body 中覆盖。在 BulkRequest 中指定 index 意味着您不需要在 BulkRequestBody 中包含它。可选。 |
source | SourceConfigParam | 控制响应中是否返回完整的 _source 、不返回 _source ,或只返回 _source 中的特定字段。可选。 |
source_excludes | repeated string | 要从 source 中排除的字段。可选。 |
source_includes | repeated string | 要从 source 中包含的字段。可选。 |
pipeline | 字符串 | 预处理摄入管道 ID。可选。 |
refresh | 刷新 | 索引后是否刷新分片。可选。 |
require_alias | bool | 如果为 true ,操作必须以别名为目标。可选。 |
路由 | 字符串 | 分片分配的路由值。可选。 |
timeout | 字符串 | 超时持续时间(例如,1m )。可选。 |
type (已弃用) | 字符串 | 文档类型(始终为 _doc )。可选。 |
wait_for_active_shards | WaitForActiveShards | 等待的最小活跃分片数量。可选。 |
BulkRequestBody 字段
BulkRequestBody
消息表示 BulkRequest
中的单个文档级操作。它接受以下字段。所有字段都是可选的,但 BulkRequestBody
中必须且只能设置 index
、create
、update
或 delete
之一。
字段 | Protobuf 类型 | 描述 |
---|---|---|
index | IndexOperation | 索引文档。如果文档已存在,则替换它。可选。 |
create | CreateOperation | 创建新文档。如果文档已存在则失败。可选。 |
update | UpdateOperation | 部分更新文档或使用 upsert/script 选项。可选。 |
删除 | DeleteOperation | 按 ID 删除文档。可选。 |
detect_noop | bool | 如果为 true ,则如果文档内容未更改,则跳过更新。可选。默认值为 true 。 |
doc | bytes(字节) | 用于 update 或 index 操作的部分或完整文档数据。可选。 |
doc_as_upsert | bool | 如果为 true ,则在目标文档不存在时将该文档视为完整的 upsert 文档。仅对 update 操作有效。可选。 |
script(脚本) | 脚本 | 应用于文档的脚本(与 update 一起使用)。可选。 |
scripted_upsert | bool | 如果为 true ,无论文档是否存在都执行脚本。可选。 |
source | SourceConfig | 控制如何获取或过滤文档源。可选。 |
upsert | bytes(字节) | 如果目标不存在,要使用的完整文档。与 script 一起使用。可选。 |
object | bytes(字节) | 与 create 一起使用的完整文档内容。可选。 |
创建
CreateOperation
仅在文档不存在时添加新文档。
文档本身必须在 object
字段中提供,位于 CreateOperation
消息之外。
还可以提供以下可选字段。
字段 | Protobuf 类型 | 描述 |
---|---|---|
id | 字符串 | 文档 ID。如果省略,将自动生成。可选。 |
index | 字符串 | 目标索引。如果未在 BulkRequest 中全局设置,则为必填。可选。 |
路由 | 字符串 | 用于控制分片放置的自定义路由值。可选。 |
if_primary_term | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
if_seq_no | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
version | int64 | 用于并发控制的显式文档版本。可选。 |
version_type | VersionType | 控制版本匹配行为。可选。 |
pipeline | 字符串 | 预处理摄入管道 ID。可选。 |
require_alias | bool | 仅强制使用索引别名。可选。 |
请求示例
以下示例显示了一个包含 create
操作的批量请求。它在 movies
索引中创建了一个 ID 为 tt1375666
的文档。以 Base64 编码提供的文档内容表示 {"title": "Inception", "year": 2010}
。
{
"index": "movies",
"request_body": [
{
"create": {
"index": "movies",
"id": "tt1375666"
},
"object": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"
}
]
}
删除
DeleteOperation
按 ID 删除文档。它接受以下字段。
字段 | Protobuf 类型 | 描述 |
---|---|---|
id | 字符串 | 要删除的文档 ID。必填。 |
index | 字符串 | 目标索引。如果未在 BulkRequest 中全局设置,则为必填。可选。 |
路由 | 字符串 | 用于控制分片放置的自定义路由值。可选。 |
if_primary_term | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
if_seq_no | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
version | int64 | 用于并发控制的显式文档版本。可选。 |
version_type | VersionType | 控制版本匹配行为。可选。 |
请求示例
以下示例显示了一个包含 delete
操作的批量请求。它从 movies
索引中删除了 ID 为 tt1392214
的文档。
{
"index": "movies",
"request_body": [
{
"delete": {
"index": "movies",
"id": "tt1392214"
}
}
]
}
索引
IndexOperation
创建或覆盖文档。如果未提供 ID,将自动生成一个。
文档本身在 doc
字段中提供,位于 IndexOperation
消息之外。
还可以提供以下可选字段。
字段 | Protobuf 类型 | 描述 |
---|---|---|
id | 字符串 | 文档 ID。如果省略,将自动生成。可选。 |
index | 字符串 | 目标索引。仅当未在 BulkRequest 中全局设置时才必填。 |
路由 | 字符串 | 用于控制分片放置的自定义路由值。可选。 |
if_primary_term | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
if_seq_no | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
op_type | OpType | 操作类型。控制覆盖行为。有效值为 index (默认)和 create 。可选。 |
version | int64 | 用于并发控制的显式文档版本。可选。 |
version_type | VersionType | 控制版本匹配行为。可选。 |
pipeline | 字符串 | 预处理摄入管道 ID。可选。 |
require_alias | bool | 仅强制使用索引别名。可选。 |
请求示例
以下示例显示了一个包含 index
操作的批量请求。它将一个 Base64 编码的文档(ID 为 tt0468569
)索引到 movies
索引中。
{
"index": "movies",
"request_body": [
{
"index": {
"index": "movies",
"id": "tt0468569"
},
"doc": "eyJ0aXRsZSI6ICJUaGUgRGFyayBLbmlnaHQiLCAieWVhciI6IDIwMDh9"
}
]
}
更新
UpdateOperation
执行部分文档更新。
文档本身在 doc
字段中提供,位于 UpdateOperation
消息之外。
UpdateOperation
的所有字段,如下表所示,除了 id
外均为可选。
字段 | Protobuf 类型 | 描述 |
---|---|---|
id | 字符串 | 要更新的文档 ID。必填。 |
index | 字符串 | 目标索引。如果未在 BulkRequest 中全局设置,则为必填。可选。 |
路由 | 字符串 | 用于控制分片放置的自定义路由值。可选。 |
if_primary_term | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
if_seq_no | int64 | 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。 |
require_alias | bool | 仅强制使用索引别名。可选。 |
retry_on_conflict | int32 | 如果发生版本冲突,操作重试的次数。可选。 |
请求示例
以下示例展示了一个带有 update
操作的批量请求。它将更新 movies
索引中 ID 为 tt1375666
的文档为 {"year": 2011}
。
{
"index": "movies",
"request_body": [
{
"update": {
"index": "movies",
"id": "tt1375666"
},
"doc": "eyJ5ZWFyIjogMjAxMX0=",
"detect_noop": true
}
]
}
更新或插入 (Upsert)
upsert
操作会在文档已存在时更新它。否则,它将使用提供的文档内容创建一个新文档。
要更新或插入一个文档,请提供一个 UpdateOperation
,并将 doc_as_upsert
指定为 true
。要更新或插入的文档应在 UpdateOperation
之外的 doc_as_upsert
字段中提供。
请求示例
以下示例展示了一个带有 upsert
操作的批量请求。它将 movies
索引中 ID 为 tt1375666
的文档的 year
字段更新为 {"year": 2012}
。
{
"index": "movies",
"request_body": [
{
"update": {
"index": "movies",
"id": "tt1375666"
},
"doc": "eyJ5ZWFyIjogMjAxMn0=",
"doc_as_upsert": true
}
]
}
脚本
运行存储或内联脚本来修改文档。
要指定脚本,请提供一个 UpdateOperation
和一个在 UpdateOperation
之外的 script
字段。
请求示例
以下示例展示了一个带有 script
操作的批量请求。它将 movies
索引中 ID 为 tt1375666
的文档的 year
字段增加 1。
{
"index": "movies",
"request_body": [
{
"update": {
"index": "movies",
"id": "tt1375666"
},
"script": {
"source": "ctx._source.year += 1",
"lang": "painless"
}
}
]
}
响应字段
gRPC 批量 API 提供以下响应字段。
BulkResponseBody 字段
BulkResponse
消息包含 BulkResponseBody
(用于成功请求)或 BulkErrorResponse
(用于失败请求)。BulkResponseBody
提供了批量操作的摘要和每个项目的详细结果,并包含以下字段。
字段 | Protobuf 类型 | 描述 |
---|---|---|
errors | bool | 指示批量请求中的任何操作是否失败。如果任何操作失败,响应的 errors 字段将为 true 。您可以遍历单独的 Item 操作以获取更详细的信息。 |
items | repeated Item | 批量请求中所有操作的结果,按提交顺序排列。 |
took | int64 | 处理批量请求所花费的时间,单位为毫秒。 |
ingest_took | int64 | 通过摄入管道处理文档所花费的时间,单位为毫秒。 |
Item 字段
响应中的每个 Item
都对应于请求中的一个操作。对于每个操作,仅提供以下字段之一。
字段 | Protobuf 类型 | 描述 |
---|---|---|
create | ResponseItem | CreateOperation 的结果。 |
删除 | ResponseItem | DeleteOperation 的结果。 |
index | ResponseItem | IndexOperation 的结果。 |
update | ResponseItem | UpdateOperation 的结果。 |
ResponseItem 字段
每个 ResponseItem
对应于请求中的一个操作。它包含以下字段。
字段 | Protobuf 类型 | 描述 |
---|---|---|
type | 字符串 | 文档类型。 |
id | ResponseItem.Id | 与操作关联的文档 ID。可以为 null 。 |
index | 字符串 | 与操作关联的索引名称。如果目标是数据流,则这是后备索引。 |
status | int32 | 为操作返回的 HTTP 状态码。(注意:此字段未来可能会被 gRPC 码替换。) |
error | ErrorCause | 包含有关失败操作的附加信息。 |
primary_term | int64 | 分配给文档的主术语(primary term)。 |
result | 字符串 | 操作结果。有效值为 created 、deleted 和 updated 。 |
seq_no | int64 | 分配给文档的序列号,用于维护版本顺序。 |
分片 | ShardInfo | 操作的分片信息(仅在成功操作时返回)。 |
version | int64 | 文档版本(仅在成功操作时返回)。 |
forced_refresh | bool | 如果为 true ,则强制文档在操作后立即可见。 |
获取 | InlineGetDictUserDefined | 如果请求,包含从内联获取返回的文档 source 。 |
示例响应
{
"bulkResponseBody": {
"errors": false,
"items": [
{
"index": {
"id": {
"string": "2"
},
"index": "my_index",
"status": 201,
"primaryTerm": "1",
"result": "created",
"seqNo": "0",
"shards": {
"successful": 1,
"total": 2
},
"version": "1",
"forcedRefresh": true
}
},
{
"create": {
"id": {
"string": "1"
},
"index": "my_index",
"status": 201,
"primaryTerm": "1",
"result": "created",
"seqNo": "0",
"shards": {
"successful": 1,
"total": 2
},
"version": "1",
"forcedRefresh": true
}
},
{
"update": {
"id": {
"string": "2"
},
"index": "my_index",
"status": 200,
"primaryTerm": "1",
"result": "updated",
"seqNo": "1",
"shards": {
"successful": 1,
"total": 2
},
"version": "2",
"forcedRefresh": true,
"get": {
"found": true,
"seqNo": "1",
"primaryTerm": "1",
"source": "e30="
}
}
},
{
"delete": {
"id": {
"string": "2"
},
"index": "my_index",
"status": 200,
"primaryTerm": "1",
"result": "deleted",
"seqNo": "2",
"shards": {
"successful": 1,
"total": 2
},
"version": "3",
"forcedRefresh": true
}
}
],
"took": "87",
"ingestTook": "0"
}
}
Java gRPC 客户端示例
以下示例展示了一个 Java 客户端程序,它提交一个示例批量 gRPC 请求,然后检查批量响应中是否存在任何错误。
import org.opensearch.protobufs.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.google.protobuf.ByteString;
public class BulkClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9400)
.usePlaintext()
.build();
DocumentServiceGrpc.DocumentServiceBlockingStub stub = DocumentServiceGrpc.newBlockingStub(channel);
IndexOperation indexOp = IndexOperation.newBuilder()
.setIndex("my-index")
.setId("1")
.build();
BulkRequestBody indexBody = BulkRequestBody.newBuilder()
.setIndex(indexOp)
.setDoc(ByteString.copyFromUtf8("{\"field\": \"value\"}"))
.build();
DeleteOperation deleteOp = DeleteOperation.newBuilder()
.setIndex("my-index")
.setId("2")
.build();
BulkRequestBody deleteBody = BulkRequestBody.newBuilder()
.setDelete(deleteOp)
.build();
BulkRequest request = BulkRequest.newBuilder()
.setIndex("my-index")
.addRequestBody(indexBody)
.addRequestBody(deleteBody)
.build();
BulkResponse response = stub.bulk(request);
System.out.println("Bulk errors: " + response.getErrors());
channel.shutdown();
}
}