Link Search Menu Expand Document Documentation Menu

源协调

源协调是在多节点环境中协调和分发 OpenSearch Data Prepper 数据源之间工作的概念。某些数据源,例如 Amazon Kinesis 或 Amazon Simple Queue Service (Amazon SQS),本身就支持协调。而其他数据源,例如 OpenSearch、Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 JDBC/ODBC,则不支持源协调。

Data Prepper 源协调决定了 Data Prepper 集群中每个节点执行哪部分工作分区,并防止出现重复的工作分区。

Kinesis 客户端库的启发,Data Prepper 利用分布式存储(以租约形式)来处理工作的分发和去重。

分区格式化

源协调将源分成“工作分区”。例如,一个 S3 对象将是 Amazon S3 的一个工作分区,或者一个 OpenSearch 索引将是 OpenSearch 的一个工作分区。

Data Prepper 获取源选择的每个工作分区,并在 Data Prepper 用于源协调的分布式存储中创建相应的项。这些项中的每一个都具有以下标准格式,该格式可以由分布式存储实现进行扩展。

类型 描述
sourceIdentifier 字符串 Data Prepper 管道处理此分区的标识符。默认情况下,sourceIdentifier 以子管道名称为前缀,但可以在 data-prepper-config.yaml 文件中通过 partition_prefix 配置额外的`前缀`。
sourcePartitionKey 字符串 与此项关联的工作分区的标识符。例如,对于具有扫描功能的 s3 源,此标识符是 S3 存储桶的 objectKey 组合。
partitionOwner 字符串 活动拥有并处理此分区的节点的标识符。此 ID 包含节点的主机名,但当此分区未被拥有时为 null
partitionProgressState 字符串 一个 JSON 字符串对象,表示工作分区的进展或在发生崩溃时源可能需要的任何附加元数据,以供另一个节点从上次停止的地方恢复。
partitionOwnershipTimeout 时间戳 每当 Data Prepper 节点获取一个分区时,会给该分区的所有者一个 10 分钟的超时,以处理节点崩溃的情况。当所有者保存分区状态时,所有权将再延长 10 分钟。
sourcePartitionStatus 枚举 表示分区的当前状态:ASSIGNED 表示分区正在处理中,UNASSIGNED 表示分区正在等待处理,CLOSED 表示分区正在等待稍后处理,COMPLETED 表示分区已处理完毕。
reOpenAt 时间戳 表示 CLOSED 分区重新打开并被认为可用于处理的时间。仅适用于 CLOSED 分区。
closedCount 长整型 跟踪分区被标记为 CLOSED 的次数。

获取分区

分区按照源提供的 List<PartitionIdentifer> 中返回的顺序获取。当节点尝试获取分区时,Data Prepper 执行以下步骤:

  1. Data Prepper 查询 ASSIGNED 分区,检查是否有任何 ASSIGNED 分区的分区所有者已过期。这旨在优先处理在处理过程中节点崩溃的分区,从而可以使用可能对时间敏感的分区状态。
  2. 查询 ASSIGNED 分区后,Data Prepper 查询 CLOSED 分区,以确定是否已达到任何分区的 reOpenAt 时间戳。
  3. 如果没有可用的 ASSIGNEDCLOSED 分区,则 Data Prepper 查询 UNASSIGNED 分区,直到其中一个分区被 ASSIGNED

如果发生此流程并且节点未获取任何分区,则 SourceCoordinatorgetNextPartition 方法中提供的分区供应商函数将创建新分区。供应商函数完成后,Data Prepper 再次查询 ASSIGNEDCLOSEDUNASSIGNED 分区。

全局状态

传递给 getNextPartition 方法的任何函数都会创建具有 Map<String, Object> 全局状态的新分区。此状态在集群中的所有节点之间共享,并且在任何给定时间点仅由单个节点运行,具体取决于源的确定。

配置

下表提供了 source_coordination 的可选配置值。

类型 描述
partition_prefix 字符串 用于 sourceIdentifier 的前缀,用于区分共享相同分布式存储的 Data Prepper 集群。
存储 对象 包含要使用的存储配置的对象,其中键是存储的名称,例如 in_memorydynamodb,值是该存储类型上可用的任何配置。

支持的存储

截至 Data Prepper 2.4,仅支持 in_memorydynamodb 存储。

  • data-prepper-config.yaml 文件中未配置 source_coordination 设置时,in_memory 存储是默认存储,并且应仅用于单节点配置。
  • dynamodb 存储用于多节点 Data Prepper 环境。dynamodb 存储可以在需要利用源协调的一个或多个 Data Prepper 集群之间共享。

DynamoDB 存储

除非将 skip_table_creation 标志配置为 true,否则 Data Prepper 将在启动时尝试创建 dynamodb 表。可以选择配置表的生存时间 (ttl),这会导致存储随时间自动清理项目。某些源依赖源协调进行数据去重,因此请确保为管道持续时间配置足够大的 ttl

如果表中未配置 ttl,则表中不再需要的任何项目都必须手动清理。

以下显示了 Data Prepper 创建表、启用 ttl 并与表交互所需的完整权限集:

{
  "Sid": "ReadWriteSourceCoordinationDynamoStore",
  "Effect": "Allow",
  "Action": [
    "dynamodb:DescribeTimeToLive",
    "dynamodb:UpdateTimeToLive",
    "dynamodb:DescribeTable",
    "dynamodb:CreateTable",
    "dynamodb:GetItem",
    "dynamodb:PutItem",
    "dynamodb:Query"
  ],
  "Resource": [
    "arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}",
    "arn:aws:dynamodb:${REGION}:${AWS_ACCOUNT_ID}:table/${TABLE_NAME}/index/source-status"
  ]
}
必需 类型 描述
table_name 字符串 用于源协调的表的名称。
region 字符串 DynamoDB 表的区域。
sts_role_arn 字符串 包含表权限的 sts 角色。如果未提供,则使用默认凭证。
sts_external_id 字符串 在 API 调用中用于假定 sts_role_arn 的外部 ID。
skip_table_creation 布尔型 如果在使用现有存储时将其设置为 true,则跳过创建存储的尝试。默认值为 false
provisioned_write_capacity_units 整数 要在表上配置的写入容量单位数。默认值为 10
provisioned_read_capacity_units 整数 要在表上配置的读取容量单位数。默认值为 10
ttl 持续时间 可选。表中项目的 TTL 持续时间。当项目更新时,TTL 会延长此持续时间。默认情况下,表中不使用 TTL。  

以下示例展示了一个 dynamodb 存储:

source_coordination:
  store:
     dynamodb:
       table_name: "DataPrepperSourceCoordinationStore"
       region: "us-east-1"
       sts_role_arn: "arn:aws:iam::##########:role/SourceCoordinationTableRole"
       ttl: "P7D"
       skip_table_creation: true

内存存储(默认)

以下示例展示了一个 in_memory 存储,它最适合与单节点集群一起使用:

source_coordination:
  store:
    in_memory:

指标

源协调指标的解释方式取决于配置的源。源协调指标的格式为 <sub-pipeline-name>_source_coordinator_<metric-name>。您可以使用子管道名称来识别这些指标的源,因为每个子管道对于每个源都是唯一的。

进度指标

以下是与分区进度相关的指标:

  • partitionsCreatedCount:已创建的分区项的数量。对于 S3 扫描,这是已为其创建分区的对象数量。
  • partitionsCompleted:已完全处理并标记为 COMPLETED 的分区数量。对于 S3 扫描,这是已处理的对象数量。
  • noPartitionsAcquired:节点尝试获取要执行工作的分区,但在存储中未找到可用分区的次数。这用于指示不再有数据进入源。
  • partitionsAcquired:节点已获取用于执行工作的分区数量。在非错误情况下,这应该等于创建的分区数量。
  • partitionsClosed:已标记为 CLOSED 的分区数量。这仅适用于使用 CLOSED 功能的源。

以下是与分区错误相关的指标:

  • partitionNotFoundErrors:表示节点主动拥有的分区项没有相应的存储项。这只应在表中项目被手动删除时发生。
  • partitionNotOwnedErrors:表示拥有分区的节点因分区所有权超时而失去所有权。除非源能够使用 saveState 检查点分区,否则此错误会导致重复的项目处理。
  • partitionUpdateErrors:更新此分区项的存储失败时收到的错误数量。以 saveStateclosecomplete 为前缀,指示哪个更新操作失败。
剩余 350 字符

有问题?

想做贡献?