dynamodb
的 dynamodb
源允许在 Amazon DynamoDB 表上启用变更数据捕获 (CDC)。它可以使用 DynamoDB 流接收表事件,例如 create
、update
或 delete
,并支持使用 时间点恢复 (PITR) 进行初始快照。
该源包含两种用于流式传输 DynamoDB 事件的摄取选项
- 使用 PITR 的*完整初始快照*获取 DynamoDB 表当前状态的初始快照。这需要在 DynamoDB 表上启用 PITR 快照和 DynamoDB 选项。
- 从 DynamoDB 流中流式传输事件,而无需完整初始快照。如果您在管道中已有快照机制,这将非常有用。这要求在 DynamoDB 表上启用 DynamoDB 流选项。
用法
以下示例管道将 DynamoDB 指定为源。它通过 PITR 快照从名为 table-a
的 DynamoDB 表摄取数据。它还指示 start_position
,这会告诉管道如何读取 DynamoDB 流事件
version: "2"
cdc-pipeline:
source:
dynamodb:
tables:
- table_arn: "arn:aws:dynamodb:us-west-2:123456789012:table/table-a"
export:
s3_bucket: "test-bucket"
s3_prefix: "myprefix"
stream:
start_position: "LATEST" # Read latest data from streams (Default)
view_on_remove: NEW_IMAGE
aws:
region: "us-west-2"
sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role"
配置选项
下表描述了 dynamodb
源的配置选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
aws | 是 | AWS | AWS 配置。有关更多信息,请参阅 aws。 |
确认 | 否 | 布尔型 | 当为 true 时,启用 s3 源以在事件被 OpenSearch 接收器接收时接收 端到端确认。 |
shared_acknowledgement_timeout | 否 | 持续时间 | 与确认结合使用时,从 DynamoDB 流读取的数据在过期前经过的时间量。默认为 10 分钟。 |
s3_data_file_acknowledgment_timeout | 否 | 持续时间 | 与确认结合使用时,从 DynamoDB 导出读取的数据在过期前经过的时间量。默认为 5 分钟。 |
表 | 是 | 列表 | DynamoDB 表的配置。有关更多信息,请参阅 表。 |
aws
在 AWS 配置中使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
区域 | 否 | 字符串 | 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域。 |
sts_role_arn | 否 | 字符串 | AWS Security Token Service (AWS STS) 角色,用于承担向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon Simple Storage Service (Amazon S3) 发出请求。默认为 null ,这将使用 标准 SDK 凭证行为。 |
aws_sts_header_overrides | 否 | 映射 | AWS Identity and Access Management (IAM) 角色为接收器插件承担的标头覆盖映射。 |
表
与 tables
配置一起使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
table_arn | 是 | 字符串 | 源 DynamoDB 表的 Amazon Resource Name (ARN)。 |
导出 | 否 | 导出 | 确定如何导出 DynamoDB 事件。有关更多信息,请参阅 导出。 |
流 | 否 | 流 | 确定管道如何从 DynamoDB 表读取数据。有关更多信息,请参阅 流。 |
导出选项
以下选项允许您自定义 DynamoDB 事件的导出目标。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
s3_bucket | 是 | 字符串 | 存储导出数据文件的目标桶。 |
s3_prefix | 否 | 字符串 | S3 桶的自定义前缀。 |
s3_sse_kms_key_id | 否 | 字符串 | 用于加密导出数据文件的 AWS Key Management Service (AWS KMS) 密钥。key_id 是 KMS 密钥的 ARN,例如 arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad4-80ca-63b12b3ec147 。 |
s3_region | 否 | 字符串 | S3 桶的区域。 |
流选项
以下选项允许您自定义管道如何从 DynamoDB 表读取事件。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
start_position | 否 | 字符串 | 当启用 DynamoDB 流选项时,源开始读取流事件的位置。LATEST 从最新的流记录开始读取事件。 |
view_on_remove | 否 | 枚举 | 用于 DynamoDB 流中 REMOVE 事件的流记录视图。必须是 NEW_IMAGE 或 OLD_IMAGE 。默认为 NEW_IMAGE 。如果使用 OLD_IMAGE 选项且找不到旧图像,则源将查找 NEW_IMAGE 。 |
暴露的元数据属性
以下元数据将被添加到由 dynamodb
源处理的每个事件中。这些元数据属性可以使用表达式语法 getMetadata
函数访问。
primary_key
:DynamoDB 项的主键。对于仅包含分区键的表,此值提供分区键。对于同时包含分区键和排序键的表,primary_key
属性将等于分区键和排序键,并用|
分隔,例如partition_key|sort_key
。partition_key
:DynamoDB 项的分区键。sort_key
:DynamoDB 项的排序键。如果表不包含排序键,此项将为 null。dynamodb_timestamp
:DynamoDB 项的时间戳。对于导出项,这将是导出时间;对于流项,这将是 DynamoDB 流事件时间。接收器使用此时间戳为 DynamoDB 流事件发出EndtoEndLatency
指标,该指标跟踪 DynamoDB 表中发生更改与该更改应用于接收器之间的延迟。document_version
:使用dynamodb_timestamp
来修改在同一秒内接收到的流项之间的断开连接。建议与opensearch
接收器的document_version
设置一起使用。opensearch_action
:将 DynamoDB 事件操作映射到 OpenSearch 操作的默认值。此操作对于导出项将是index
,对于流事件将是INSERT
或MODIFY
,当 OpenSearch 操作为delete
时,对于REMOVE
流事件将是delete
。dynamodb_event_name
:项的确切事件类型。对于导出项将为null
,对于流事件将为INSERT
、MODIFY
或REMOVE
。table_name
:事件来源的 DynamoDB 表的名称。
权限
以下是运行 DynamoDB 作为源所需的最低权限
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowDescribeTable",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/my-table"
]
},
{
"Sid": "allowRunExportJob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/my-table"
]
},
{
"Sid": "allowCheckExportjob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeExport"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/export/*"
]
},
{
"Sid": "allowReadFromStream",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/stream/*"
]
},
{
"Sid": "allowReadAndWriteToS3ForExport",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": [
"arn:aws:s3:::my-bucket/*"
]
}
]
}
执行导出时,不需要 "Sid": "allowReadFromStream"
部分。如果仅从 DynamoDB 流读取,则不需要 "Sid": "allowReadAndWriteToS3ForExport"
、"Sid": "allowCheckExportjob"
和 “Sid”: “allowRunExportJob”
部分。
指标
的 dynamodb
源包含以下指标。
计数器
exportJobSuccess
:已成功提交的导出作业数量。exportJobFailure
:已失败的导出作业提交尝试次数。exportS3ObjectsTotal
:S3 中找到的导出数据文件总数。exportS3ObjectsProcessed
:已从 S3 成功处理的导出数据文件总数。exportRecordsTotal
:导出中找到的记录总数。exportRecordsProcessed
:已成功处理的导出记录总数。exportRecordsProcessingErrors
:导出记录处理错误数量。changeEventsProcessed
:从 DynamoDB 流处理的变更事件数量。changeEventsProcessingErrors
:从 DynamoDB 流处理变更事件的错误数量。shardProgress
:当 DynamoDB 流被正确读取时,分片进度的增量。如果长时间为0
,则表示启用流的管道存在问题。