基于拉取的摄取管理
3.0 版本引入
这是一项实验性功能,不建议在生产环境中使用。有关该功能的最新进展或您希望提供反馈,请加入 OpenSearch 论坛上的讨论。
OpenSearch 提供了以下 API 来管理拉取式数据摄取。
暂停数据摄取
暂停一个或多个索引的数据摄取。暂停后,OpenSearch 将停止从流式源为指定索引中的所有分片消费数据。
端点
POST /<index>/ingestion/_pause
路径参数
下表列出了可用的路径参数。
参数 | 数据类型 | 必需/可选 | 描述 |
---|---|---|---|
index | 字符串 | 必需 | 要暂停的索引。可以是多个索引名称的逗号分隔列表。 |
查询参数
下表列出了可用的查询参数。所有查询参数都是可选的。
参数 | 数据类型 | 描述 |
---|---|---|
cluster_manager_timeout | 时间单位 | 等待连接到集群管理器节点的时间量。默认值为 30s 。 |
timeout | 时间单位 | 等待集群响应的时间。默认为 30 秒 。 |
请求示例
POST /my-index/ingestion/_pause
恢复数据摄取
恢复一个或多个索引的数据摄取。恢复后,OpenSearch 将继续从流式源为指定索引中的所有分片消费数据。
作为恢复操作的一部分,您可以选择重置流消费者,使其从特定偏移量或时间戳开始读取。如果指定了重置设置,则在对索引应用恢复操作之前,将重置所选分片的所有消费者。重置消费者还会触发内部刷新以持久化更改。
端点
POST /<index>/ingestion/_resume
路径参数
下表列出了可用的路径参数。
参数 | 数据类型 | 必需/可选 | 描述 |
---|---|---|---|
index | 字符串 | 必需 | 要恢复数据摄取的索引。可以是多个索引名称的逗号分隔列表。 |
查询参数
下表列出了可用的查询参数。所有查询参数都是可选的。
参数 | 数据类型 | 描述 |
---|---|---|
cluster_manager_timeout | 时间单位 | 等待连接到集群管理器节点的时间量。默认值为 30s 。 |
timeout | 时间单位 | 等待集群响应的时间。默认为 30 秒 。 |
请求正文字段
下表列出了可用的请求正文字段。
字段 | 数据类型 | 必需/可选 | 描述 |
---|---|---|---|
reset_settings | 数组 | 可选 | 每个分片的重置设置列表。如果未提供,OpenSearch 将从指定索引中每个分片的当前位置恢复数据摄取。 |
reset_settings.shard | 整数 | 必需 | 要重置的分片。 |
reset_settings.mode | 字符串 | 必需 | 重置模式。有效值为 offset (一个正整数偏移量)和 timestamp (一个 Unix 时间戳,以毫秒为单位)。 |
reset_settings.value | 字符串 | 必需 | • offset :Apache Kafka 偏移量或 Amazon Kinesis 序列号• timestamp :一个 Unix 时间戳,以毫秒为单位。 |
请求示例
要在不指定重置设置的情况下恢复数据摄取,请发送以下请求
POST /my-index/ingestion/_resume
要在恢复数据摄取时提供重置设置,请发送以下请求
POST /my-index/ingestion/_resume
{
"reset_settings": [
{
"shard": 0,
"mode": "offset",
"value": "1"
}
]
}
获取数据摄取状态
返回一个或多个索引的当前数据摄取状态。此 API 支持分页。
端点
GET /<index>/ingestion/_state
路径参数
下表列出了可用的路径参数。
参数 | 数据类型 | 必需/可选 | 描述 |
---|---|---|---|
index | 字符串 | 必需 | 要返回数据摄取状态的索引。可以是多个索引名称的逗号分隔列表。 |
查询参数
下表列出了可用的查询参数。所有查询参数都是可选的。
参数 | 数据类型 | 描述 |
---|---|---|
timeout | 时间单位 | 等待集群响应的时间。默认为 30 秒 。 |
请求示例
以下是使用默认设置的请求
GET /my-index/ingestion/_state
以下示例显示了页面大小为 20 的请求
GET /my-index/ingestion/_state?size=20
以下示例显示了带有下一页令牌的请求
GET /my-index/ingestion/_state?size=20&next_token=<next_page_token>
示例响应
{
"_shards": {
"total": 1,
"successful": 1,
"failed": 0,
"failures": [
{
"shard": 0,
"index": "my-index",
"status": "INTERNAL_SERVER_ERROR",
"reason": {
"type": "timeout_exception",
"reason": "error message"
}
}
]
},
"next_page_token" : "page token if not on last page",
"ingestion_state": {
"indexName": [
{
"shard": 0,
"poller_state": "POLLING",
"error_policy": "DROP",
"poller_paused": false
}
]
}
}