源协调
源协调是在多节点环境中协调和分发 OpenSearch Data Prepper 数据源之间工作的概念。某些数据源,例如 Amazon Kinesis 或 Amazon Simple Queue Service (Amazon SQS),本身就支持协调。而其他数据源,例如 OpenSearch、Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 JDBC/ODBC,则不支持源协调。
Data Prepper 源协调决定了 Data Prepper 集群中每个节点执行哪部分工作分区,并防止出现重复的工作分区。
受 Kinesis 客户端库的启发,Data Prepper 利用分布式存储(以租约形式)来处理工作的分发和去重。
分区格式化
源协调将源分成“工作分区”。例如,一个 S3 对象将是 Amazon S3 的一个工作分区,或者一个 OpenSearch 索引将是 OpenSearch 的一个工作分区。
Data Prepper 获取源选择的每个工作分区,并在 Data Prepper 用于源协调的分布式存储中创建相应的项。这些项中的每一个都具有以下标准格式,该格式可以由分布式存储实现进行扩展。
值 | 类型 | 描述 |
---|---|---|
sourceIdentifier | 字符串 | Data Prepper 管道处理此分区的标识符。默认情况下,sourceIdentifier 以子管道名称为前缀,但可以在 data-prepper-config.yaml 文件中通过 partition_prefix 配置额外的`前缀`。 |
sourcePartitionKey | 字符串 | 与此项关联的工作分区的标识符。例如,对于具有扫描功能的 s3 源,此标识符是 S3 存储桶的 objectKey 组合。 |
partitionOwner | 字符串 | 活动拥有并处理此分区的节点的标识符。此 ID 包含节点的主机名,但当此分区未被拥有时为 null 。 |
partitionProgressState | 字符串 | 一个 JSON 字符串对象,表示工作分区的进展或在发生崩溃时源可能需要的任何附加元数据,以供另一个节点从上次停止的地方恢复。 |
partitionOwnershipTimeout | 时间戳 | 每当 Data Prepper 节点获取一个分区时,会给该分区的所有者一个 10 分钟的超时,以处理节点崩溃的情况。当所有者保存分区状态时,所有权将再延长 10 分钟。 |
sourcePartitionStatus | 枚举 | 表示分区的当前状态:ASSIGNED 表示分区正在处理中,UNASSIGNED 表示分区正在等待处理,CLOSED 表示分区正在等待稍后处理,COMPLETED 表示分区已处理完毕。 |
reOpenAt | 时间戳 | 表示 CLOSED 分区重新打开并被认为可用于处理的时间。仅适用于 CLOSED 分区。 |
closedCount | 长整型 | 跟踪分区被标记为 CLOSED 的次数。 |
获取分区
分区按照源提供的 List<PartitionIdentifer>
中返回的顺序获取。当节点尝试获取分区时,Data Prepper 执行以下步骤:
- Data Prepper 查询
ASSIGNED
分区,检查是否有任何ASSIGNED
分区的分区所有者已过期。这旨在优先处理在处理过程中节点崩溃的分区,从而可以使用可能对时间敏感的分区状态。 - 查询
ASSIGNED
分区后,Data Prepper 查询CLOSED
分区,以确定是否已达到任何分区的reOpenAt
时间戳。 - 如果没有可用的
ASSIGNED
或CLOSED
分区,则 Data Prepper 查询UNASSIGNED
分区,直到其中一个分区被ASSIGNED
。
如果发生此流程并且节点未获取任何分区,则 SourceCoordinator
的 getNextPartition
方法中提供的分区供应商函数将创建新分区。供应商函数完成后,Data Prepper 再次查询 ASSIGNED
、CLOSED
和 UNASSIGNED
分区。
全局状态
传递给 getNextPartition
方法的任何函数都会创建具有 Map<String, Object>
全局状态的新分区。此状态在集群中的所有节点之间共享,并且在任何给定时间点仅由单个节点运行,具体取决于源的确定。
配置
下表提供了 source_coordination
的可选配置值。
值 | 类型 | 描述 |
---|---|---|
partition_prefix | 字符串 | 用于 sourceIdentifier 的前缀,用于区分共享相同分布式存储的 Data Prepper 集群。 |
存储 | 对象 | 包含要使用的存储配置的对象,其中键是存储的名称,例如 in_memory 或 dynamodb ,值是该存储类型上可用的任何配置。 |
支持的存储
截至 Data Prepper 2.4,仅支持 in_memory
和 dynamodb
存储。
- 当
data-prepper-config.yaml
文件中未配置source_coordination
设置时,in_memory
存储是默认存储,并且应仅用于单节点配置。 dynamodb
存储用于多节点 Data Prepper 环境。dynamodb
存储可以在需要利用源协调的一个或多个 Data Prepper 集群之间共享。
DynamoDB 存储
除非将 skip_table_creation
标志配置为 true
,否则 Data Prepper 将在启动时尝试创建 dynamodb
表。可以选择配置表的生存时间 (ttl
),这会导致存储随时间自动清理项目。某些源依赖源协调进行数据去重,因此请确保为管道持续时间配置足够大的 ttl
。
如果表中未配置 ttl
,则表中不再需要的任何项目都必须手动清理。
以下显示了 Data Prepper 创建表、启用 ttl
并与表交互所需的完整权限集:
{
"Sid": "ReadWriteSourceCoordinationDynamoStore",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTimeToLive",
"dynamodb:UpdateTimeToLive",
"dynamodb:DescribeTable",
"dynamodb:CreateTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}",
"arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}/index/source-status"
]
}
值 | 必需 | 类型 | 描述 |
---|---|---|---|
table_name | 是 | 字符串 | 用于源协调的表的名称。 |
region | 是 | 字符串 | DynamoDB 表的区域。 |
sts_role_arn | 否 | 字符串 | 包含表权限的 sts 角色。如果未提供,则使用默认凭证。 |
sts_external_id | 否 | 字符串 | 在 API 调用中用于假定 sts_role_arn 的外部 ID。 |
skip_table_creation | 否 | 布尔型 | 如果在使用现有存储时将其设置为 true ,则跳过创建存储的尝试。默认值为 false 。 |
provisioned_write_capacity_units | 否 | 整数 | 要在表上配置的写入容量单位数。默认值为 10 。 |
provisioned_read_capacity_units | 否 | 整数 | 要在表上配置的读取容量单位数。默认值为 10 。 |
ttl | 持续时间 | 可选。表中项目的 TTL 持续时间。当项目更新时,TTL 会延长此持续时间。默认情况下,表中不使用 TTL。 |
以下示例展示了一个 dynamodb
存储:
source_coordination:
store:
dynamodb:
table_name: "DataPrepperSourceCoordinationStore"
region: "us-east-1"
sts_role_arn: "arn:aws:iam::##########:role/SourceCoordinationTableRole"
ttl: "P7D"
skip_table_creation: true
内存存储(默认)
以下示例展示了一个 in_memory
存储,它最适合与单节点集群一起使用:
source_coordination:
store:
in_memory:
指标
源协调指标的解释方式取决于配置的源。源协调指标的格式为 <sub-pipeline-name>_source_coordinator_<metric-name>
。您可以使用子管道名称来识别这些指标的源,因为每个子管道对于每个源都是唯一的。
进度指标
以下是与分区进度相关的指标:
partitionsCreatedCount
:已创建的分区项的数量。对于 S3 扫描,这是已为其创建分区的对象数量。partitionsCompleted
:已完全处理并标记为COMPLETED
的分区数量。对于 S3 扫描,这是已处理的对象数量。noPartitionsAcquired
:节点尝试获取要执行工作的分区,但在存储中未找到可用分区的次数。这用于指示不再有数据进入源。partitionsAcquired
:节点已获取用于执行工作的分区数量。在非错误情况下,这应该等于创建的分区数量。partitionsClosed
:已标记为CLOSED
的分区数量。这仅适用于使用 CLOSED 功能的源。
以下是与分区错误相关的指标:
partitionNotFoundErrors
:表示节点主动拥有的分区项没有相应的存储项。这只应在表中项目被手动删除时发生。partitionNotOwnedErrors
:表示拥有分区的节点因分区所有权超时而失去所有权。除非源能够使用saveState
检查点分区,否则此错误会导致重复的项目处理。partitionUpdateErrors
:更新此分区项的存储失败时收到的错误数量。以saveState
、close
或complete
为前缀,指示哪个更新操作失败。