Link Search Menu Expand Document Documentation Menu

批量(gRPC)

3.0 版本引入

这是一项实验性功能,不建议在生产环境中使用。有关该功能进度的更新或如果您想留下反馈,请参阅相关的 GitHub issue

gRPC 批量 API 提供了一种高效、二进制编码的替代方案,用于执行单个调用中的多个文档操作(例如索引、更新和删除),以替代 HTTP 批量 API。此服务使用协议缓冲区,并在参数和结构方面与 REST API 保持一致。

先决条件

要提交 gRPC 请求,您必须在客户端拥有一组 protobufs。有关如何获取 protobufs 的信息,请参阅 使用 gRPC API

gRPC 服务和方法

gRPC 文档 API 位于 DocumentService 中。

您可以通过在 DocumentService 中调用 Bulk gRPC 方法来提交批量请求。该方法接收一个 BulkRequest 并返回一个 BulkResponse

文档格式

在 gRPC 中,文档必须以字节形式提供和返回。请使用 Base64 编码在 gRPC 请求中提供文档。

例如,常规批量 API 请求中的以下文档

"doc":  "{\"title\": \"Inception\", \"year\": 2010}"

对于 gRPC 批量 API 请求,请以 Base64 编码提供相同的文档

"doc": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"

BulkRequest 字段

BulkRequest 消息是 gRPC 批量操作的顶级容器。它接受以下字段。

字段 Protobuf 类型 描述
request_body repeated BulkRequestBody 批量操作列表(index/create/update/delete)。必填。
index 字符串 所有操作的默认索引,除非在 request_body 中覆盖。在 BulkRequest 中指定 index 意味着您不需要在 BulkRequestBody 中包含它。可选。
source SourceConfigParam 控制响应中是否返回完整的 _source、不返回 _source,或只返回 _source 中的特定字段。可选。
source_excludes repeated string 要从 source 中排除的字段。可选。
source_includes repeated string 要从 source 中包含的字段。可选。
pipeline 字符串 预处理摄入管道 ID。可选。
refresh 刷新 索引后是否刷新分片。可选。
require_alias bool 如果为 true,操作必须以别名为目标。可选。
路由 字符串 分片分配的路由值。可选。
timeout 字符串 超时持续时间(例如,1m)。可选。
type(已弃用) 字符串 文档类型(始终为 _doc)。可选。
wait_for_active_shards WaitForActiveShards 等待的最小活跃分片数量。可选。

BulkRequestBody 字段

BulkRequestBody 消息表示 BulkRequest 中的单个文档级操作。它接受以下字段。所有字段都是可选的,但 BulkRequestBody 中必须且只能设置 indexcreateupdatedelete 之一。

字段 Protobuf 类型 描述
index IndexOperation 索引文档。如果文档已存在,则替换它。可选。
create CreateOperation 创建新文档。如果文档已存在则失败。可选。
update UpdateOperation 部分更新文档或使用 upsert/script 选项。可选。
删除 DeleteOperation 按 ID 删除文档。可选。
detect_noop bool 如果为 true,则如果文档内容未更改,则跳过更新。可选。默认值为 true
doc bytes(字节) 用于 updateindex 操作的部分或完整文档数据。可选。
doc_as_upsert bool 如果为 true,则在目标文档不存在时将该文档视为完整的 upsert 文档。仅对 update 操作有效。可选。
script(脚本) 脚本 应用于文档的脚本(与 update 一起使用)。可选。
scripted_upsert bool 如果为 true,无论文档是否存在都执行脚本。可选。
source SourceConfig 控制如何获取或过滤文档源。可选。
upsert bytes(字节) 如果目标不存在,要使用的完整文档。与 script 一起使用。可选。
object bytes(字节) create 一起使用的完整文档内容。可选。

创建

CreateOperation 仅在文档不存在时添加新文档。

文档本身必须在 object 字段中提供,位于 CreateOperation 消息之外。

还可以提供以下可选字段。

字段 Protobuf 类型 描述
id 字符串 文档 ID。如果省略,将自动生成。可选。
index 字符串 目标索引。如果未在 BulkRequest 中全局设置,则为必填。可选。
路由 字符串 用于控制分片放置的自定义路由值。可选。
if_primary_term int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
if_seq_no int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
version int64 用于并发控制的显式文档版本。可选。
version_type VersionType 控制版本匹配行为。可选。
pipeline 字符串 预处理摄入管道 ID。可选。
require_alias bool 仅强制使用索引别名。可选。

请求示例

以下示例显示了一个包含 create 操作的批量请求。它在 movies 索引中创建了一个 ID 为 tt1375666 的文档。以 Base64 编码提供的文档内容表示 {"title": "Inception", "year": 2010}

{
  "index": "movies",
  "request_body": [
    {
      "create": {
        "index": "movies",
        "id": "tt1375666"
      },
      "object": "eyJ0aXRsZSI6ICJJbmNlcHRpb24iLCAieWVhciI6IDIwMTB9"
    }
  ]
}

删除

DeleteOperation 按 ID 删除文档。它接受以下字段。

字段 Protobuf 类型 描述
id 字符串 要删除的文档 ID。必填。
index 字符串 目标索引。如果未在 BulkRequest 中全局设置,则为必填。可选。
路由 字符串 用于控制分片放置的自定义路由值。可选。
if_primary_term int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
if_seq_no int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
version int64 用于并发控制的显式文档版本。可选。
version_type VersionType 控制版本匹配行为。可选。

请求示例

以下示例显示了一个包含 delete 操作的批量请求。它从 movies 索引中删除了 ID 为 tt1392214 的文档。

{
  "index": "movies",
  "request_body": [
    {
      "delete": {
        "index": "movies",
        "id": "tt1392214"
      }
    }
  ]
}

索引

IndexOperation 创建或覆盖文档。如果未提供 ID,将自动生成一个。

文档本身在 doc 字段中提供,位于 IndexOperation 消息之外。

还可以提供以下可选字段。

字段 Protobuf 类型 描述
id 字符串 文档 ID。如果省略,将自动生成。可选。
index 字符串 目标索引。仅当未在 BulkRequest 中全局设置时才必填。
路由 字符串 用于控制分片放置的自定义路由值。可选。
if_primary_term int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
if_seq_no int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
op_type OpType 操作类型。控制覆盖行为。有效值为 index(默认)和 create。可选。
version int64 用于并发控制的显式文档版本。可选。
version_type VersionType 控制版本匹配行为。可选。
pipeline 字符串 预处理摄入管道 ID。可选。
require_alias bool 仅强制使用索引别名。可选。

请求示例

以下示例显示了一个包含 index 操作的批量请求。它将一个 Base64 编码的文档(ID 为 tt0468569)索引到 movies 索引中。

{
  "index": "movies",
  "request_body": [
    {
      "index": {
        "index": "movies",
        "id": "tt0468569"
      },
      "doc": "eyJ0aXRsZSI6ICJUaGUgRGFyayBLbmlnaHQiLCAieWVhciI6IDIwMDh9"
    }
  ]
}

更新

UpdateOperation 执行部分文档更新。

文档本身在 doc 字段中提供,位于 UpdateOperation 消息之外。

UpdateOperation 的所有字段,如下表所示,除了 id 外均为可选。

字段 Protobuf 类型 描述
id 字符串 要更新的文档 ID。必填。
index 字符串 目标索引。如果未在 BulkRequest 中全局设置,则为必填。可选。
路由 字符串 用于控制分片放置的自定义路由值。可选。
if_primary_term int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
if_seq_no int64 用于并发控制。只有当文档的主术语(primary term)与此值匹配时,操作才会运行。可选。
require_alias bool 仅强制使用索引别名。可选。
retry_on_conflict int32 如果发生版本冲突,操作重试的次数。可选。

请求示例

以下示例展示了一个带有 update 操作的批量请求。它将更新 movies 索引中 ID 为 tt1375666 的文档为 {"year": 2011}

{
  "index": "movies",
  "request_body": [
    {
      "update": {
        "index": "movies",
        "id": "tt1375666"
      },
      "doc": "eyJ5ZWFyIjogMjAxMX0=",
      "detect_noop": true
    }
  ]
}

更新或插入 (Upsert)

upsert 操作会在文档已存在时更新它。否则,它将使用提供的文档内容创建一个新文档。

要更新或插入一个文档,请提供一个 UpdateOperation,并将 doc_as_upsert 指定为 true。要更新或插入的文档应在 UpdateOperation 之外的 doc_as_upsert 字段中提供。

请求示例

以下示例展示了一个带有 upsert 操作的批量请求。它将 movies 索引中 ID 为 tt1375666 的文档的 year 字段更新为 {"year": 2012}

{
  "index": "movies",
  "request_body": [
    {
      "update": {
        "index": "movies",
        "id": "tt1375666"
      },
      "doc": "eyJ5ZWFyIjogMjAxMn0=",
      "doc_as_upsert": true
    }
  ]
}

脚本

运行存储或内联脚本来修改文档。

要指定脚本,请提供一个 UpdateOperation 和一个在 UpdateOperation 之外的 script 字段。

请求示例

以下示例展示了一个带有 script 操作的批量请求。它将 movies 索引中 ID 为 tt1375666 的文档的 year 字段增加 1。

{
  "index": "movies",
  "request_body": [
    {
      "update": {
        "index": "movies",
        "id": "tt1375666"
      },
      "script": {
        "source": "ctx._source.year += 1",
        "lang": "painless"
      }
    }
  ]
}

响应字段

gRPC 批量 API 提供以下响应字段。

BulkResponseBody 字段

BulkResponse 消息包含 BulkResponseBody(用于成功请求)或 BulkErrorResponse(用于失败请求)。BulkResponseBody 提供了批量操作的摘要和每个项目的详细结果,并包含以下字段。

字段 Protobuf 类型 描述
errors bool 指示批量请求中的任何操作是否失败。如果任何操作失败,响应的 errors 字段将为 true。您可以遍历单独的 Item 操作以获取更详细的信息。
items repeated Item 批量请求中所有操作的结果,按提交顺序排列。
took int64 处理批量请求所花费的时间,单位为毫秒。
ingest_took int64 通过摄入管道处理文档所花费的时间,单位为毫秒。

Item 字段

响应中的每个 Item 都对应于请求中的一个操作。对于每个操作,仅提供以下字段之一。

字段 Protobuf 类型 描述
create ResponseItem CreateOperation 的结果。
删除 ResponseItem DeleteOperation 的结果。
index ResponseItem IndexOperation 的结果。
update ResponseItem UpdateOperation 的结果。

ResponseItem 字段

每个 ResponseItem 对应于请求中的一个操作。它包含以下字段。

字段 Protobuf 类型 描述
type 字符串 文档类型。
id ResponseItem.Id 与操作关联的文档 ID。可以为 null
index 字符串 与操作关联的索引名称。如果目标是数据流,则这是后备索引。
status int32 为操作返回的 HTTP 状态码。(注意:此字段未来可能会被 gRPC 码替换。)
error ErrorCause 包含有关失败操作的附加信息。
primary_term int64 分配给文档的主术语(primary term)。
result 字符串 操作结果。有效值为 createddeletedupdated
seq_no int64 分配给文档的序列号,用于维护版本顺序。
分片 ShardInfo 操作的分片信息(仅在成功操作时返回)。
version int64 文档版本(仅在成功操作时返回)。
forced_refresh bool 如果为 true,则强制文档在操作后立即可见。
获取 InlineGetDictUserDefined 如果请求,包含从内联获取返回的文档 source

示例响应

{
  "bulkResponseBody": {
    "errors": false,
    "items": [
      {
        "index": {
          "id": {
            "string": "2"
          },
          "index": "my_index",
          "status": 201,
          "primaryTerm": "1",
          "result": "created",
          "seqNo": "0",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "1",
          "forcedRefresh": true
        }
      },
      {
        "create": {
          "id": {
            "string": "1"
          },
          "index": "my_index",
          "status": 201,
          "primaryTerm": "1",
          "result": "created",
          "seqNo": "0",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "1",
          "forcedRefresh": true
        }
      },
      {
        "update": {
          "id": {
            "string": "2"
          },
          "index": "my_index",
          "status": 200,
          "primaryTerm": "1",
          "result": "updated",
          "seqNo": "1",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "2",
          "forcedRefresh": true,
          "get": {
            "found": true,
            "seqNo": "1",
            "primaryTerm": "1",
            "source": "e30="
          }
        }
      },
      {
        "delete": {
          "id": {
            "string": "2"
          },
          "index": "my_index",
          "status": 200,
          "primaryTerm": "1",
          "result": "deleted",
          "seqNo": "2",
          "shards": {
            "successful": 1,
            "total": 2
          },
          "version": "3",
          "forcedRefresh": true
        }
      }
    ],
    "took": "87",
    "ingestTook": "0"
  }
}

Java gRPC 客户端示例

以下示例展示了一个 Java 客户端程序,它提交一个示例批量 gRPC 请求,然后检查批量响应中是否存在任何错误。

import org.opensearch.protobufs.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.google.protobuf.ByteString;

public class BulkClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9400)
                .usePlaintext()
                .build();

        DocumentServiceGrpc.DocumentServiceBlockingStub stub = DocumentServiceGrpc.newBlockingStub(channel);

        IndexOperation indexOp = IndexOperation.newBuilder()
                .setIndex("my-index")
                .setId("1")
                .build();

        BulkRequestBody indexBody = BulkRequestBody.newBuilder()
                .setIndex(indexOp)
                .setDoc(ByteString.copyFromUtf8("{\"field\": \"value\"}"))
                .build();

        DeleteOperation deleteOp = DeleteOperation.newBuilder()
                .setIndex("my-index")
                .setId("2")
                .build();

        BulkRequestBody deleteBody = BulkRequestBody.newBuilder()
                .setDelete(deleteOp)
                .build();

        BulkRequest request = BulkRequest.newBuilder()
                .setIndex("my-index")
                .addRequestBody(indexBody)
                .addRequestBody(deleteBody)
                .build();

        BulkResponse response = stub.bulk(request);
        System.out.println("Bulk errors: " + response.getErrors());

        channel.shutdown();
    }
}