Link Search Menu Expand Document Documentation Menu

基于拉取的摄取管理

3.0 版本引入

这是一项实验性功能,不建议在生产环境中使用。有关该功能的最新进展或您希望提供反馈,请加入 OpenSearch 论坛上的讨论。

OpenSearch 提供了以下 API 来管理拉取式数据摄取。

暂停数据摄取

暂停一个或多个索引的数据摄取。暂停后,OpenSearch 将停止从流式源为指定索引中的所有分片消费数据。

端点

POST /<index>/ingestion/_pause

路径参数

下表列出了可用的路径参数。

参数 数据类型 必需/可选 描述
index 字符串 必需 要暂停的索引。可以是多个索引名称的逗号分隔列表。

查询参数

下表列出了可用的查询参数。所有查询参数都是可选的。

参数 数据类型 描述
cluster_manager_timeout 时间单位 等待连接到集群管理器节点的时间量。默认值为 30s
timeout 时间单位 等待集群响应的时间。默认为 30 秒

请求示例

POST /my-index/ingestion/_pause

恢复数据摄取

恢复一个或多个索引的数据摄取。恢复后,OpenSearch 将继续从流式源为指定索引中的所有分片消费数据。

作为恢复操作的一部分,您可以选择重置流消费者,使其从特定偏移量或时间戳开始读取。如果指定了重置设置,则在对索引应用恢复操作之前,将重置所选分片的所有消费者。重置消费者还会触发内部刷新以持久化更改。

端点

POST /<index>/ingestion/_resume

路径参数

下表列出了可用的路径参数。

参数 数据类型 必需/可选 描述
index 字符串 必需 要恢复数据摄取的索引。可以是多个索引名称的逗号分隔列表。

查询参数

下表列出了可用的查询参数。所有查询参数都是可选的。

参数 数据类型 描述
cluster_manager_timeout 时间单位 等待连接到集群管理器节点的时间量。默认值为 30s
timeout 时间单位 等待集群响应的时间。默认为 30 秒

请求正文字段

下表列出了可用的请求正文字段。

字段 数据类型 必需/可选 描述
reset_settings 数组 可选 每个分片的重置设置列表。如果未提供,OpenSearch 将从指定索引中每个分片的当前位置恢复数据摄取。
reset_settings.shard 整数 必需 要重置的分片。
reset_settings.mode 字符串 必需 重置模式。有效值为 offset(一个正整数偏移量)和 timestamp(一个 Unix 时间戳,以毫秒为单位)。
reset_settings.value 字符串 必需  • offset:Apache Kafka 偏移量或 Amazon Kinesis 序列号
 • timestamp:一个 Unix 时间戳,以毫秒为单位。

请求示例

要在不指定重置设置的情况下恢复数据摄取,请发送以下请求

POST /my-index/ingestion/_resume

要在恢复数据摄取时提供重置设置,请发送以下请求

POST /my-index/ingestion/_resume
{
  "reset_settings": [
    {
      "shard": 0,
      "mode": "offset",
      "value": "1"
    }
  ]
}

获取数据摄取状态

返回一个或多个索引的当前数据摄取状态。此 API 支持分页。

端点

GET /<index>/ingestion/_state

路径参数

下表列出了可用的路径参数。

参数 数据类型 必需/可选 描述
index 字符串 必需 要返回数据摄取状态的索引。可以是多个索引名称的逗号分隔列表。

查询参数

下表列出了可用的查询参数。所有查询参数都是可选的。

参数 数据类型 描述
timeout 时间单位 等待集群响应的时间。默认为 30 秒

请求示例

以下是使用默认设置的请求

GET /my-index/ingestion/_state

以下示例显示了页面大小为 20 的请求

GET /my-index/ingestion/_state?size=20

以下示例显示了带有下一页令牌的请求

GET /my-index/ingestion/_state?size=20&next_token=<next_page_token>

示例响应

{
  "_shards": {
    "total": 1,
    "successful": 1,
    "failed": 0,
    "failures": [
      {
        "shard": 0,
        "index": "my-index",
        "status": "INTERNAL_SERVER_ERROR",
        "reason": {
          "type": "timeout_exception",
          "reason": "error message"
        }
      }
    ]
  },
  "next_page_token" : "page token if not on last page",
  "ingestion_state": {
    "indexName": [
      {
        "shard": 0,
        "poller_state": "POLLING",
        "error_policy": "DROP",
        "poller_paused": false
      }
    ]
  }
}

相关文章