s3
s3 接收器将 OpenSearch Data Prepper 事件批次保存并写入到 Amazon Simple Storage Service (Amazon S3) 对象中。配置的 codec
决定了 s3
接收器如何将数据序列化到 Amazon S3 中。
s3 接收器在批量处理事件时使用以下格式
${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension}
当一批对象写入 Amazon S3 时,这些对象的格式与以下类似
my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json
有关如何配置对象的更多信息,请参阅对象键。
用法
以下示例创建了一个配置了 s3
接收器的管道。它包含用于自定义管道的事件和大小阈值的其他选项,并将编解码器类型设置为 ndjson
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
max_retries: 5
bucket: bucket_name
object_key:
path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 10000
maximum_size: 50mb
event_collect_timeout: 15s
codec:
ndjson:
buffer_type: in_memory
IAM 权限
要使用 s3
接收器,请配置 AWS Identity and Access Management (IAM) 以授予 Data Prepper 写入 Amazon S3 的权限。您可以使用类似于以下 JSON 配置的配置
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3-access",
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
}
]
}
跨账户 S3 访问
当 Data Prepper 从 S3 存储桶获取数据时,它使用存储桶所有者条件验证存储桶所有权。
默认情况下,S3 接收器不需要 bucket_owners
。如果配置了 bucket_owners
且存储桶未包含在其中一个映射配置中,则 default_bucket_owner
默认为 aws.sts_role_arn
中的账户 ID。您可以同时配置 bucket_owners
和 default_bucket_owner
并一起应用这些设置。
当从具有不同账户关联的多个 S3 存储桶摄取数据时,请根据以下条件配置 Data Prepper 以进行跨账户 S3 访问
- 对于属于同一账户的 S3 存储桶,将
default_bucket_owner
设置为该账户的 ID。 - 对于属于多个账户的 S3 存储桶,请使用
bucket_owners
映射。
一个 bucket_owners
映射指定属于多个账户的存储桶的账户 ID。例如,在以下配置中,my-bucket-01
由 123456789012
拥有,而 my-bucket-02
由 999999999999
拥有
sink:
- s3:
default_bucket_owner: 111111111111
bucket_owners:
my-bucket-01: 123456789012
my-bucket-02: 999999999999
配置
自定义 s3
接收器时使用以下选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
bucket | 是 | 字符串 | 指定接收器的 S3 存储桶名称。支持使用Data Prepper 表达式进行动态存储桶命名,例如 test-${/bucket_id} 。如果动态存储桶不可访问且未配置 default_bucket ,则对象数据将被丢弃。 |
default_bucket | 否 | 字符串 | 用于 bucket 中不可访问的动态存储桶的静态存储桶。 |
bucket_owners | 否 | 映射 | 存储桶名称及其账户所有者 ID 的映射,用于跨账户访问。请参阅跨账户 S3 访问。 |
default_bucket_owner | 否 | 字符串 | S3 存储桶所有者的 AWS 账户 ID。请参阅跨账户 S3 访问。 |
编解码器 | 是 | 编解码器 | 序列化 S3 对象中的数据。 |
aws | 是 | AWS | AWS 配置。请参阅aws。 |
阈值 | 是 | 阈值 | 将对象写入 S3 的条件。 |
aggregate_threshold | 否 | 聚合阈值 | 刷新具有动态 path_prefix 的对象的条件。 |
object_key | 否 | 对象键 | 设置对象存储的 path_prefix 和 file_pattern 。文件模式为 events-%{yyyy-MM-dd'T'hh-mm-ss} 。默认情况下,这些对象位于存储桶的根目录中。path_prefix 是可配置的。 |
压缩 | 否 | 字符串 | 压缩算法:none 、gzip 或 snappy 。默认为 none 。 |
buffer_type | 否 | 缓冲区类型 | 缓冲区类型配置。 |
max_retries | 否 | 整数 | S3 摄取请求的最大重试次数。默认为 5 。 |
aws
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
region | 否 | 字符串 | 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域。 |
sts_role_arn | 否 | 字符串 | 用于向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon S3 发送请求时假定的 AWS Security Token Service (AWS STS) 角色。默认为 null ,这表示使用凭据的标准 SDK 行为。 |
sts_header_overrides | 否 | 映射 | IAM 角色为汇流器插件承担的头覆盖映射。 |
sts_external_id | 否 | 字符串 | 当 Data Prepper 承担角色时使用的 AWS STS 外部 ID。有关更多信息,请参阅 AWS STS API 参考中 AssumeRole 下的 ExternalId 部分。 |
阈值配置
使用以下选项为 s3
接收器设置摄取阈值。当满足以下任一条件时,Data Prepper 会将事件写入 S3 对象。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
event_count | 是 | 整数 | 在将对象写入 S3 之前累积的 Data Prepper 事件数量。 |
maximum_size | 否 | 字符串 | 在将对象写入 S3 之前累积的最大字节数。默认为 50mb 。 |
event_collect_timeout | 是 | 字符串 | Data Prepper 将事件写入 S3 之前的最长时间。该值应为 ISO-8601 持续时间(例如 PT2M30S )或简单表示法(例如 60s 或 1500ms )。 |
聚合阈值配置
使用以下选项设置当聚合值超过定义阈值时触发特定操作或行为的规则或限制。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
flush_capacity_ratio | 否 | 浮点型 | 当达到 aggregate_threshold maximum_size 时要强制刷新组的百分比。该百分比表示为 0.0 到 1.0 之间的数字。默认为 0.5 。 |
maximum_size | 是 | 字符串 | 在强制刷新对象之前累积的最大字节数。例如,128mb 。 |
缓冲区类型
buffer_type
是一个可选配置,它决定了 Data Prepper 在将对象写入 S3 之前如何临时存储数据。默认值为 in_memory
。
使用以下选项之一
in_memory
:将记录存储在内存中。local_file
:将记录刷新到本地机器上的文件中。此选项使用您机器的临时目录。multipart
:使用S3 多部分上传写入。每 10 MB 作为一部分写入。
对象键配置
使用以下选项定义如何为存储在 S3 中的对象构建对象键。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
path_prefix | 否 | 字符串 | 用于写入 S3 的对象的 S3 键前缀路径。接受日期时间格式和使用Data Prepper 表达式动态注入值。例如,您可以使用 /${/my_partition_key}/%{yyyy}/%{MM}/%{dd}/%{HH}/ 根据 my_partition_key 值在 S3 中创建每小时文件夹。前缀路径应以 / 结尾。默认情况下,Data Prepper 将对象写入 S3 存储桶根目录。 |
编解码器
codec
决定了 s3
源如何格式化写入每个 S3 对象的数据。
avro
编解码器
avro
编解码器将事件写入为 Apache Avro 文档。由于 Avro 需要模式,您可以定义模式或让 Data Prepper 自动生成它。建议定义自己的模式,因为这将允许根据您的特定用例进行定制。
当您提供自己的 Avro 模式时,该模式定义了数据的最终结构。任何传入事件中未在 Avro 模式中映射的额外值将不会包含在最终目的地中。Data Prepper 不允许自定义模式与 include_keys
或 exclude_keys
一起使用,以避免自定义 Avro 模式与 include_keys
或 exclude_keys
接收器配置之间产生混淆。
在数据统一的情况下,您可能能够自动生成模式。自动生成的模式基于编解码器接收到的第一个事件。该模式将仅包含此事件中的键,并且所有键都必须存在于所有事件中才能自动生成可用的模式。自动生成的模式将所有字段设置为可空。使用 include_keys
和 exclude_keys
接收器配置来控制自动生成的模式中包含哪些数据。
Avro 字段应使用空联合,因为这将允许缺失值。否则,每个事件都必须存在所有必需字段。仅当您确定非空字段存在时才使用它们。
使用以下选项配置编解码器。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
schema | 是 | 字符串 | Avro 模式声明。如果 auto_schema 设置为 true,则不需要。 |
auto_schema | 否 | 布尔型 | 当设置为 true 时,从第一个事件自动生成 Avro 模式声明。 |
ndjson
编解码器
ndjson
编解码器将每一行写入为一个 JSON 对象。ndjson
编解码器不接受任何配置。
json
编解码器
json
编解码器将事件写入单个大型 JSON 文件。每个事件都写入 JSON 数组中的一个对象。
使用以下选项配置编解码器。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
key_name | 否 | 字符串 | JSON 数组的键名。默认为 events 。 |
parquet
编解码器
parquet
编解码器将事件写入 Parquet 文件。使用该编解码器时,将 buffer_type
设置为 in_memory
。
parquet
编解码器使用模式写入数据。由于 Parquet 需要 Avro 模式,您可以自己定义模式,或让 Data Prepper 自动生成。建议定义自己的模式,因为这将允许根据您的特定用例进行定制。
有关 Avro 模式的更多信息,请参阅Avro 编解码器。
使用以下选项配置编解码器。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
schema | 是 | 字符串 | Avro 模式声明。如果 auto_schema 设置为 true,则不需要。 |
auto_schema | 否 | 布尔型 | 当设置为 true 时,从第一个事件自动生成 Avro 模式声明。 |
使用 Parquet 设置模式
以下管道示例展示了如何配置 s3
接收器,以使用 VPC Flow Logs 的模式将 Parquet 数据写入 Parquet 文件
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
bucket: mys3bucket
object_key:
path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/
codec:
parquet:
schema: >
{
"type" : "record",
"namespace" : "org.opensearch.dataprepper.examples",
"name" : "VpcFlowLog",
"fields" : [
{ "name" : "version", "type" : ["null", "string"]},
{ "name" : "srcport", "type": ["null", "int"]},
{ "name" : "dstport", "type": ["null", "int"]},
{ "name" : "accountId", "type" : ["null", "string"]},
{ "name" : "interfaceId", "type" : ["null", "string"]},
{ "name" : "srcaddr", "type" : ["null", "string"]},
{ "name" : "dstaddr", "type" : ["null", "string"]},
{ "name" : "start", "type": ["null", "int"]},
{ "name" : "end", "type": ["null", "int"]},
{ "name" : "protocol", "type": ["null", "int"]},
{ "name" : "packets", "type": ["null", "int"]},
{ "name" : "bytes", "type": ["null", "int"]},
{ "name" : "action", "type": ["null", "string"]},
{ "name" : "logStatus", "type" : ["null", "string"]}
]
}
threshold:
event_count: 500000000
maximum_size: 20mb
event_collect_timeout: PT15M
buffer_type: in_memory