Link Search Menu Expand Document Documentation Menu

s3

s3 接收器将 OpenSearch Data Prepper 事件批次保存并写入到 Amazon Simple Storage Service (Amazon S3) 对象中。配置的 codec 决定了 s3 接收器如何将数据序列化到 Amazon S3 中。

s3 接收器在批量处理事件时使用以下格式

${pathPrefix}events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}-${currentTimeInNanos}-${uniquenessId}.${codecSuppliedExtension}

当一批对象写入 Amazon S3 时,这些对象的格式与以下类似

my-logs/2023/06/09/06/events-2023-06-09T06-00-01-1686290401871214927-ae15b8fa-512a-59c2-b917-295a0eff97c8.json

有关如何配置对象的更多信息,请参阅对象键

用法

以下示例创建了一个配置了 s3 接收器的管道。它包含用于自定义管道的事件和大小阈值的其他选项,并将编解码器类型设置为 ndjson

pipeline:
  ...
  sink:
    - s3:
        aws:
          region: us-east-1
          sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
        max_retries: 5
        bucket: bucket_name
        object_key:
          path_prefix: my-logs/%{yyyy}/%{MM}/%{dd}/
        threshold:
          event_count: 10000
          maximum_size: 50mb
          event_collect_timeout: 15s
        codec:
          ndjson:
        buffer_type: in_memory

IAM 权限

要使用 s3 接收器,请配置 AWS Identity and Access Management (IAM) 以授予 Data Prepper 写入 Amazon S3 的权限。您可以使用类似于以下 JSON 配置的配置

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3-access",
            "Effect": "Allow",
            "Action": [
              "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
        }
    ]
}

跨账户 S3 访问

当 Data Prepper 从 S3 存储桶获取数据时,它使用存储桶所有者条件验证存储桶所有权。

默认情况下,S3 接收器不需要 bucket_owners。如果配置了 bucket_owners 且存储桶未包含在其中一个映射配置中,则 default_bucket_owner 默认为 aws.sts_role_arn 中的账户 ID。您可以同时配置 bucket_ownersdefault_bucket_owner 并一起应用这些设置。

当从具有不同账户关联的多个 S3 存储桶摄取数据时,请根据以下条件配置 Data Prepper 以进行跨账户 S3 访问

  • 对于属于同一账户的 S3 存储桶,将 default_bucket_owner 设置为该账户的 ID。
  • 对于属于多个账户的 S3 存储桶,请使用 bucket_owners 映射。

一个 bucket_owners 映射指定属于多个账户的存储桶的账户 ID。例如,在以下配置中,my-bucket-01123456789012 拥有,而 my-bucket-02999999999999 拥有

sink:
  - s3:
      default_bucket_owner: 111111111111
      bucket_owners:
        my-bucket-01: 123456789012
        my-bucket-02: 999999999999

配置

自定义 s3 接收器时使用以下选项。

选项 必需 类型 描述
bucket 字符串 指定接收器的 S3 存储桶名称。支持使用Data Prepper 表达式进行动态存储桶命名,例如 test-${/bucket_id}。如果动态存储桶不可访问且未配置 default_bucket,则对象数据将被丢弃。
default_bucket 字符串 用于 bucket 中不可访问的动态存储桶的静态存储桶。
bucket_owners 映射 存储桶名称及其账户所有者 ID 的映射,用于跨账户访问。请参阅跨账户 S3 访问
default_bucket_owner 字符串 S3 存储桶所有者的 AWS 账户 ID。请参阅跨账户 S3 访问
编解码器 编解码器 序列化 S3 对象中的数据。
aws AWS AWS 配置。请参阅aws
阈值 阈值 将对象写入 S3 的条件。
aggregate_threshold 聚合阈值 刷新具有动态 path_prefix 的对象的条件。
object_key 对象键 设置对象存储的 path_prefixfile_pattern。文件模式为 events-%{yyyy-MM-dd'T'hh-mm-ss}。默认情况下,这些对象位于存储桶的根目录中。path_prefix 是可配置的。
压缩 字符串 压缩算法:nonegzipsnappy。默认为 none
buffer_type 缓冲区类型 缓冲区类型配置。
max_retries 整数 S3 摄取请求的最大重试次数。默认为 5

aws

选项 必需 类型 描述
region 字符串 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域
sts_role_arn 字符串 用于向 Amazon Simple Queue Service (Amazon SQS) 和 Amazon S3 发送请求时假定的 AWS Security Token Service (AWS STS) 角色。默认为 null,这表示使用凭据的标准 SDK 行为
sts_header_overrides 映射 IAM 角色为汇流器插件承担的头覆盖映射。
sts_external_id 字符串 当 Data Prepper 承担角色时使用的 AWS STS 外部 ID。有关更多信息,请参阅 AWS STS API 参考中 AssumeRole 下的 ExternalId 部分。

阈值配置

使用以下选项为 s3 接收器设置摄取阈值。当满足以下任一条件时,Data Prepper 会将事件写入 S3 对象。

选项 必需 类型 描述
event_count 整数 在将对象写入 S3 之前累积的 Data Prepper 事件数量。
maximum_size 字符串 在将对象写入 S3 之前累积的最大字节数。默认为 50mb
event_collect_timeout 字符串 Data Prepper 将事件写入 S3 之前的最长时间。该值应为 ISO-8601 持续时间(例如 PT2M30S)或简单表示法(例如 60s1500ms)。

聚合阈值配置

使用以下选项设置当聚合值超过定义阈值时触发特定操作或行为的规则或限制。

选项 必需 类型 描述
flush_capacity_ratio 浮点型 当达到 aggregate_threshold maximum_size 时要强制刷新组的百分比。该百分比表示为 0.01.0 之间的数字。默认为 0.5
maximum_size 字符串 在强制刷新对象之前累积的最大字节数。例如,128mb

缓冲区类型

buffer_type 是一个可选配置,它决定了 Data Prepper 在将对象写入 S3 之前如何临时存储数据。默认值为 in_memory

使用以下选项之一

  • in_memory:将记录存储在内存中。
  • local_file:将记录刷新到本地机器上的文件中。此选项使用您机器的临时目录。
  • multipart:使用S3 多部分上传写入。每 10 MB 作为一部分写入。

对象键配置

使用以下选项定义如何为存储在 S3 中的对象构建对象键。

选项 必需 类型 描述
path_prefix 字符串 用于写入 S3 的对象的 S3 键前缀路径。接受日期时间格式和使用Data Prepper 表达式动态注入值。例如,您可以使用 /${/my_partition_key}/%{yyyy}/%{MM}/%{dd}/%{HH}/ 根据 my_partition_key 值在 S3 中创建每小时文件夹。前缀路径应以 / 结尾。默认情况下,Data Prepper 将对象写入 S3 存储桶根目录。

编解码器

codec 决定了 s3 源如何格式化写入每个 S3 对象的数据。

avro 编解码器

avro 编解码器将事件写入为 Apache Avro 文档。由于 Avro 需要模式,您可以定义模式或让 Data Prepper 自动生成它。建议定义自己的模式,因为这将允许根据您的特定用例进行定制。

当您提供自己的 Avro 模式时,该模式定义了数据的最终结构。任何传入事件中未在 Avro 模式中映射的额外值将不会包含在最终目的地中。Data Prepper 不允许自定义模式与 include_keysexclude_keys 一起使用,以避免自定义 Avro 模式与 include_keysexclude_keys 接收器配置之间产生混淆。

在数据统一的情况下,您可能能够自动生成模式。自动生成的模式基于编解码器接收到的第一个事件。该模式将仅包含此事件中的键,并且所有键都必须存在于所有事件中才能自动生成可用的模式。自动生成的模式将所有字段设置为可空。使用 include_keysexclude_keys 接收器配置来控制自动生成的模式中包含哪些数据。

Avro 字段应使用空联合,因为这将允许缺失值。否则,每个事件都必须存在所有必需字段。仅当您确定非空字段存在时才使用它们。

使用以下选项配置编解码器。

选项 必需 类型 描述
schema 字符串 Avro 模式声明。如果 auto_schema 设置为 true,则不需要。
auto_schema 布尔型 当设置为 true 时,从第一个事件自动生成 Avro 模式声明

ndjson 编解码器

ndjson 编解码器将每一行写入为一个 JSON 对象。ndjson 编解码器不接受任何配置。

json 编解码器

json 编解码器将事件写入单个大型 JSON 文件。每个事件都写入 JSON 数组中的一个对象。

使用以下选项配置编解码器。

选项 必需 类型 描述
key_name 字符串 JSON 数组的键名。默认为 events

parquet 编解码器

parquet 编解码器将事件写入 Parquet 文件。使用该编解码器时,将 buffer_type 设置为 in_memory

parquet 编解码器使用模式写入数据。由于 Parquet 需要 Avro 模式,您可以自己定义模式,或让 Data Prepper 自动生成。建议定义自己的模式,因为这将允许根据您的特定用例进行定制。

有关 Avro 模式的更多信息,请参阅Avro 编解码器

使用以下选项配置编解码器。

选项 必需 类型 描述
schema 字符串 Avro 模式声明。如果 auto_schema 设置为 true,则不需要。
auto_schema 布尔型 当设置为 true 时,从第一个事件自动生成 Avro 模式声明

使用 Parquet 设置模式

以下管道示例展示了如何配置 s3 接收器,以使用 VPC Flow Logs 的模式将 Parquet 数据写入 Parquet 文件

pipeline:
  ...
  sink:
    - s3:
        aws:
          region: us-east-1
          sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
        bucket: mys3bucket
        object_key:
          path_prefix: vpc-flow-logs/%{yyyy}/%{MM}/%{dd}/%{HH}/
        codec:
          parquet:
            schema: >
              {
                "type" : "record",
                "namespace" : "org.opensearch.dataprepper.examples",
                "name" : "VpcFlowLog",
                "fields" : [
                  { "name" : "version", "type" : ["null", "string"]},
                  { "name" : "srcport", "type": ["null", "int"]},
                  { "name" : "dstport", "type": ["null", "int"]},
                  { "name" : "accountId", "type" : ["null", "string"]},
                  { "name" : "interfaceId", "type" : ["null", "string"]},
                  { "name" : "srcaddr", "type" : ["null", "string"]},
                  { "name" : "dstaddr", "type" : ["null", "string"]},
                  { "name" : "start", "type": ["null", "int"]},
                  { "name" : "end", "type": ["null", "int"]},
                  { "name" : "protocol", "type": ["null", "int"]},
                  { "name" : "packets", "type": ["null", "int"]},
                  { "name" : "bytes", "type": ["null", "int"]},
                  { "name" : "action", "type": ["null", "string"]},
                  { "name" : "logStatus", "type" : ["null", "string"]}
                ]
              }
        threshold:
          event_count: 500000000
          maximum_size: 20mb
          event_collect_timeout: PT15M
        buffer_type: in_memory