Link Search Menu Expand Document Documentation Menu

dynamodb

dynamodb 源允许在 Amazon DynamoDB 表上启用变更数据捕获 (CDC)。它可以使用 DynamoDB 流接收表事件,例如 createupdatedelete,并支持使用 时间点恢复 (PITR) 进行初始快照。

该源包含两种用于流式传输 DynamoDB 事件的摄取选项

  1. 使用 PITR 的*完整初始快照*获取 DynamoDB 表当前状态的初始快照。这需要在 DynamoDB 表上启用 PITR 快照和 DynamoDB 选项。
  2. 从 DynamoDB 流中流式传输事件,而无需完整初始快照。如果您在管道中已有快照机制,这将非常有用。这要求在 DynamoDB 表上启用 DynamoDB 流选项。

用法

以下示例管道将 DynamoDB 指定为源。它通过 PITR 快照从名为 table-a 的 DynamoDB 表摄取数据。它还指示 start_position,这会告诉管道如何读取 DynamoDB 流事件

version: "2"
cdc-pipeline:
  source:
    dynamodb:
      tables:
        - table_arn: "arn:aws:dynamodb:us-west-2:123456789012:table/table-a"
          export:
            s3_bucket: "test-bucket"
            s3_prefix: "myprefix"
          stream:
            start_position: "LATEST" # Read latest data from streams (Default)
            view_on_remove: NEW_IMAGE
      aws:
        region: "us-west-2"
        sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role"

配置选项

下表描述了 dynamodb 源的配置选项。

选项 必需 类型 描述
aws AWS AWS 配置。有关更多信息,请参阅 aws
确认 布尔型 当为 true 时,启用 s3 源以在事件被 OpenSearch 接收器接收时接收 端到端确认
shared_acknowledgement_timeout 持续时间 与确认结合使用时,从 DynamoDB 流读取的数据在过期前经过的时间量。默认为 10 分钟。
s3_data_file_acknowledgment_timeout 持续时间 与确认结合使用时,从 DynamoDB 导出读取的数据在过期前经过的时间量。默认为 5 分钟。
列表 DynamoDB 表的配置。有关更多信息,请参阅

aws

在 AWS 配置中使用以下选项。

选项 必需 类型 描述
区域 字符串 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域
sts_role_arn 字符串 AWS Security Token Service (AWS STS) 角色,用于承担向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon Simple Storage Service (Amazon S3) 发出请求。默认为 null,这将使用 标准 SDK 凭证行为
aws_sts_header_overrides 映射 AWS Identity and Access Management (IAM) 角色为接收器插件承担的标头覆盖映射。

tables 配置一起使用以下选项。

选项 必需 类型 描述
table_arn 字符串 源 DynamoDB 表的 Amazon Resource Name (ARN)。
导出 导出 确定如何导出 DynamoDB 事件。有关更多信息,请参阅 导出
确定管道如何从 DynamoDB 表读取数据。有关更多信息,请参阅

导出选项

以下选项允许您自定义 DynamoDB 事件的导出目标。

选项 必需 类型 描述
s3_bucket 字符串 存储导出数据文件的目标桶。
s3_prefix 字符串 S3 桶的自定义前缀。
s3_sse_kms_key_id 字符串 用于加密导出数据文件的 AWS Key Management Service (AWS KMS) 密钥。key_id 是 KMS 密钥的 ARN,例如 arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad4-80ca-63b12b3ec147
s3_region 字符串 S3 桶的区域。

流选项

以下选项允许您自定义管道如何从 DynamoDB 表读取事件。

选项 必需 类型 描述
start_position 字符串 当启用 DynamoDB 流选项时,源开始读取流事件的位置。LATEST 从最新的流记录开始读取事件。
view_on_remove 枚举 用于 DynamoDB 流中 REMOVE 事件的流记录视图。必须是 NEW_IMAGEOLD_IMAGE。默认为 NEW_IMAGE。如果使用 OLD_IMAGE 选项且找不到旧图像,则源将查找 NEW_IMAGE

暴露的元数据属性

以下元数据将被添加到由 dynamodb 源处理的每个事件中。这些元数据属性可以使用表达式语法 getMetadata 函数访问。

  • primary_key:DynamoDB 项的主键。对于仅包含分区键的表,此值提供分区键。对于同时包含分区键和排序键的表,primary_key 属性将等于分区键和排序键,并用 | 分隔,例如 partition_key|sort_key
  • partition_key:DynamoDB 项的分区键。
  • sort_key:DynamoDB 项的排序键。如果表不包含排序键,此项将为 null。
  • dynamodb_timestamp:DynamoDB 项的时间戳。对于导出项,这将是导出时间;对于流项,这将是 DynamoDB 流事件时间。接收器使用此时间戳为 DynamoDB 流事件发出 EndtoEndLatency 指标,该指标跟踪 DynamoDB 表中发生更改与该更改应用于接收器之间的延迟。
  • document_version:使用 dynamodb_timestamp 来修改在同一秒内接收到的流项之间的断开连接。建议与 opensearch 接收器的 document_version 设置一起使用。
  • opensearch_action:将 DynamoDB 事件操作映射到 OpenSearch 操作的默认值。此操作对于导出项将是 index,对于流事件将是 INSERTMODIFY,当 OpenSearch 操作为 delete 时,对于 REMOVE 流事件将是 delete
  • dynamodb_event_name:项的确切事件类型。对于导出项将为 null,对于流事件将为 INSERTMODIFYREMOVE
  • table_name:事件来源的 DynamoDB 表的名称。

权限

以下是运行 DynamoDB 作为源所需的最低权限

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Sid": "allowDescribeTable",
        "Effect": "Allow",
        "Action": [
          "dynamodb:DescribeTable"
        ],
        "Resource": [
          "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table"
        ]
      },
       {
            "Sid": "allowRunExportJob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeContinuousBackups",
                "dynamodb:ExportTableToPointInTime"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table"
            ]
        },
        {
            "Sid": "allowCheckExportjob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeExport"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/export/*"
            ]
        },
        {
            "Sid": "allowReadFromStream",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/stream/*"
            ]
        },
        {
            "Sid": "allowReadAndWriteToS3ForExport",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::my-bucket/*"
            ]
        }
    ]
}

执行导出时,不需要 "Sid": "allowReadFromStream" 部分。如果仅从 DynamoDB 流读取,则不需要 "Sid": "allowReadAndWriteToS3ForExport""Sid": "allowCheckExportjob"“Sid”: “allowRunExportJob” 部分。

指标

dynamodb 源包含以下指标。

计数器

  • exportJobSuccess:已成功提交的导出作业数量。
  • exportJobFailure:已失败的导出作业提交尝试次数。
  • exportS3ObjectsTotal:S3 中找到的导出数据文件总数。
  • exportS3ObjectsProcessed:已从 S3 成功处理的导出数据文件总数。
  • exportRecordsTotal:导出中找到的记录总数。
  • exportRecordsProcessed:已成功处理的导出记录总数。
  • exportRecordsProcessingErrors:导出记录处理错误数量。
  • changeEventsProcessed:从 DynamoDB 流处理的变更事件数量。
  • changeEventsProcessingErrors:从 DynamoDB 流处理变更事件的错误数量。
  • shardProgress:当 DynamoDB 流被正确读取时,分片进度的增量。如果长时间为 0,则表示启用流的管道存在问题。
剩余 350 字符

有问题?

想做贡献?