Link Search Menu Expand Document Documentation Menu

s3 源

s3 是一个源插件,用于从 Amazon Simple Storage Service (Amazon S3) 对象读取事件。您可以将源配置为使用 Amazon Simple Queue Service (Amazon SQS) 队列或扫描 S3 存储桶

  • 要使用 Amazon SQS 通知,请在您的 S3 存储桶上配置 S3 事件通知。配置 Amazon SQS 后,s3 源将从 Amazon SQS 接收消息。当 SQS 消息指示已创建 S3 对象时,s3 源将加载 S3 对象,然后使用配置的 编解码器解析它们。
  • 要使用 S3 存储桶,请配置 s3 源以使用 Amazon S3 Select 而非 OpenSearch Data Prepper 来解析 S3 对象。

IAM 权限

为了使用 s3 源,请配置您的 AWS Identity and Access Management (IAM) 权限,以授予 Data Prepper 访问 Amazon S3 的权限。您可以使用类似于以下 JSON 配置的配置

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "s3-access",
            "Effect": "Allow",
            "Action": [
              "s3:GetObject",
              "s3:ListBucket",
              "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
        },
        {
            "Sid": "sqs-access",
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage"
            ],
            "Resource": "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
        },
        {
            "Sid": "kms-access",
            "Effect": "Allow",
            "Action": "kms:Decrypt",
            "Resource": "arn:aws:kms:<YOUR-REGION>:<123456789012>:key/<YOUR-KMS-KEY>"
        }
    ]
}

如果您的 S3 对象或 Amazon SQS 队列不使用 AWS Key Management Service (AWS KMS),请移除 kms:Decrypt 权限。

如果您未启用 visibility_duplication_protection,则可以从 SQS 队列的访问权限中移除 sqs:ChangeMessageVisibility 权限。

跨账户 S3 访问

当 Data Prepper 从 S3 存储桶获取数据时,它使用存储桶所有者条件验证存储桶的所有权。默认情况下,Data Prepper 期望 S3 存储桶由拥有相关 SQS 队列的相同账户拥有。如果没有提供 SQS,Data Prepper 将使用 aws 配置中的 Amazon Resource Name (ARN) 角色。

如果您计划从多个 S3 存储桶摄取数据,但每个存储桶都与不同的 S3 账户关联,则需要根据以下条件配置 Data Prepper 以检查跨账户 S3 访问

  • 如果您想要数据的 S3 存储桶都属于 SQS 队列账户以外的账户,请将 default_bucket_owner 设置为存储桶账户持有者的账户 ID。
  • 如果您的 S3 存储桶位于多个账户中,请使用 bucket_owners 映射。

在以下示例中,SQS 队列由账户 000000000000 拥有。SQS 队列包含来自两个 S3 存储桶的数据:my-bucket-01my-bucket-02。由于 my-bucket-01123456789012 拥有,而 my-bucket-02999999999999 拥有,因此 bucket_owners 映射使用其账户 ID 调用这两个存储桶所有者,如以下配置所示

s3:
  sqs:
      queue_url: "https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue"
  bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999

您可以同时使用 bucket_ownersdefault_bucket_owner

配置

您可以使用以下选项配置 s3 源。

选项 必需 类型 描述
notification_type 字符串 必须是 sqs
notification_source 字符串 确定 SQS 如何接收通知。必须是 s3eventbridges3 表示直接从 Amazon S3 发送到 Amazon SQS 的通知,或从 Amazon S3 扇出到 Amazon Simple Notification Service (Amazon SNS) 再到 Amazon SQS 的通知。eventbridge 表示来自 Amazon EventBridgeAmazon Security Lake 的通知。默认值为 s3
compression 字符串 要应用的压缩算法:nonegzipsnappyautomatic。默认值为 none
编解码器 编解码器 要应用的编解码器
sqs SQS SQS 配置。有关更多信息,请参阅 sqs
aws AWS AWS 配置。有关更多信息,请参阅 aws
on_error 字符串 确定如何处理 Amazon SQS 中的错误。可以是 retain_messagesdelete_messagesretain_messages 会将消息留在 Amazon SQS 队列中,并尝试再次发送该消息。建议将其用于死信队列。delete_messages 会删除失败的消息。默认值为 retain_messages
buffer_timeout 持续时间 在发生超时之前,允许将事件写入 Data Prepper 缓冲区的时间量。Amazon S3 源在指定时间内无法写入缓冲区的任何事件都将被丢弃。默认值为 10s
records_to_accumulate 整数 写入缓冲区之前累积的消息数。默认值为 100
metadata_root_key 字符串 用于向每个事件添加 S3 元数据的基本键。元数据包括每个 S3 对象的键和存储桶。默认值为 s3/
default_bucket_owner 字符串 S3 存储桶所有者的 AWS 账户 ID。有关更多信息,请参阅跨账户 S3 访问
bucket_owners 映射 包含拥有存储桶的账户 ID 的存储桶名称映射。有关更多信息,请参阅跨账户 S3 访问
disable_bucket_ownership_validation 布尔型 当设置为 true 时,S3 源不会尝试验证存储桶是否由预期账户拥有。预期账户是拥有 Amazon SQS 队列的同一个账户。默认值为 false
确认 布尔型 当设置为 true 时,启用 s3 源在事件被 OpenSearch 接收器接收时接收端到端确认
s3_select s3_select Amazon S3 Select 配置。
scan scan S3 扫描配置。
delete_s3_objects_on_read 布尔型 当设置为 true 时,S3 扫描会在 S3 对象中的所有事件都被所有接收器成功确认后尝试删除 S3 对象。删除 S3 对象时应启用 acknowledgments。默认值为 false
workers 整数 配置源用于从 S3 读取数据的工作线程数 (1–10)。除非您的 S3 对象大小小于 1 MB,否则请将此值保留为默认值。对于较大的 S3 对象,性能可能会下降。此设置会影响基于 SQS 的源和 S3 扫描源。默认值为 1

sqs

以下参数允许您在 s3 源插件中配置 Amazon SQS 的使用。

选项 必需 类型 描述
queue_url 字符串 接收消息的 Amazon SQS 队列的 URL。
maximum_messages 整数 在任何单个请求中从 Amazon SQS 队列接收的最大消息数。默认值为 10
visibility_timeout 持续时间 应用于从 Amazon SQS 队列读取的消息的可见性超时。此值应设置为 Data Prepper 读取批处理中所有 S3 对象可能花费的时间量。默认值为 30s
wait_time 持续时间 等待 Amazon SQS API 长轮询的时间量。默认值为 20s
poll_delay 持续时间 在读取和处理一批 Amazon SQS 消息以及发出后续请求之间设置的延迟。默认值为 0s
visibility_duplication_protection 布尔型 如果设置为 true,Data Prepper 会尝试通过延长 SQS 消息的可见性超时来避免重复处理。在数据到达接收器之前,Data Prepper 将定期调用 ChangeMessageVisibility 以避免重复读取 S3 对象。要使用此功能,您需要授予 IAM 角色 sqs:ChangeMessageVisibility 权限。默认值为 false
visibility_duplicate_protection_timeout 持续时间 在使用 visibility_duplication_protection 时,设置消息不会被处理的最长总时间。默认为两小时。

aws

选项 必需 类型 描述
region 字符串 用于凭据的 AWS 区域。默认为标准 SDK 行为来确定区域
sts_role_arn 字符串 用于向 Amazon SQS 和 Amazon S3 发出请求时承担的 AWS Security Token Service (AWS STS) 角色。默认值为 null,这将使用标准 SDK 凭证行为
aws_sts_header_overrides 映射 IAM 角色为汇流器插件承担的头覆盖映射。
sts_external_id 字符串 Data Prepper 承担 STS 角色时使用的 STS 外部 ID。有关更多信息,请参阅 STS AssumeRole API 参考中的 ExternalID 文档。

编解码器

编解码器决定 s3 源如何解析每个 Amazon S3 对象。为了提高效率和性能,您可以将编解码器组合与某些处理器一起使用。

newline 编解码器

newline 编解码器将每一单行解析为单个日志事件。这对于大多数应用程序日志是理想的,因为每个事件都按单行解析。它也适用于每行具有独立 JSON 对象的 S3 对象,这在使用 parse_json 处理器解析每行时非常匹配。

使用以下选项配置 newline 编解码器。

选项 必需 类型 描述
skip_lines 整数 在创建事件之前要跳过的行数。您可以使用此配置跳过常见的标题行。默认值为 0
header_destination 字符串 分配给 S3 对象标题行的键值。如果指定此选项,则每个事件将包含一个 header_destination 字段。

json 编解码器

json 编解码器将每个 S3 对象解析为 JSON 数组中的单个 JSON 对象,然后为数组中的每个对象创建一个 Data Prepper 日志事件。

csv 编解码器

csv 编解码器解析逗号分隔值 (CSV) 格式的对象,每行生成一个 Data Prepper 日志事件。使用以下选项配置 csv 编解码器。

选项 必需 类型 描述
delimiter 整数 分隔列的分隔符。默认值为 ,
quote_character 字符串 用作 CSV 数据文本限定符的字符。默认值为 "
header 字符串列表 包含用于解析 CSV 数据的列名的标题。
detect_header 布尔型 Amazon S3 对象的第一行是否应解释为标题。默认值为 true

s3_selects3 源一起使用

配置 s3_select 以解析 Amazon S3 对象时,请使用以下选项

选项 必需 类型 描述
expression 是,在使用 s3_select 字符串 用于查询对象的表达式。直接映射到 expression 属性。
expression_type 字符串 提供的表达式的类型。默认值为 SQL。直接映射到 ExpressionType
input_serialization 是,在使用 s3_select 字符串 提供 S3 Select 文件格式。Amazon S3 使用此格式将对象数据解析为记录,并仅返回与指定 SQL 表达式匹配的记录。可以是 csvjsonparquet
compression_type 字符串 指定对象的压缩格式。直接映射到 CompressionType
csv csv 提供用于处理 CSV 数据的 CSV 配置。
json(JSON) json(JSON) 提供用于处理 JSON 数据的 JSON 配置。

csv

结合 s3_selectcsv 配置使用以下选项,以确定解析后的 CSV 文件应如何格式化。

这些选项直接映射到 S3 Select CSVInput 数据类型中可用的选项。

选项 必需 类型 描述
file_header_info 字符串 描述输入的第一行。直接映射到 FileHeaderInfo 属性。
quote_escape 字符串 用于转义已转义值内部引号字符的单个字符。直接映射到 QuoteEscapeCharacter 属性。
comments 字符串 用于指示当字符出现在行首时应忽略该行的单个字符。直接映射到 Comments 属性。

json

结合 s3_selectjson 使用以下选项,以确定 S3 Select 如何处理 JSON 文件。

选项 必需 类型 描述
type 字符串 JSON 数组的类型。可以是 DOCUMENTLINES。直接映射到 Type 属性。

scans3 源一起使用

以下参数允许您扫描 S3 对象。所有选项都可以在存储桶级别进行配置。

选项 必需 类型 描述
start_time 字符串 从给定 start_time 后开始扫描对象的时间。这应遵循 ISO LocalDateTime 格式,例如 023-01-23T10:00:00。如果 end_timestart_time 一起配置,则 start_time 之后和 end_time 之前的所有对象都将被处理。start_timerange 不能一起使用。
结束时间 字符串 在给定 end_time 后不再扫描对象的时间。这应遵循 ISO LocalDateTime 格式,例如 023-01-23T10:00:00。如果 start_timeend_time 一起配置,则 start_time 之后和 end_time 之前的所有对象都将被处理。end_timerange 不能一起使用。
range 字符串 从所有存储桶扫描对象的时间范围。支持 ISO_8601 表示法字符串,例如 PT20.345SPT15M,以及秒 (60s) 和毫秒 (1600ms) 的表示法字符串。start_timeend_time 不能与 range 一起使用。范围 P12H 会扫描管道启动后过去 12 小时内修改的所有对象。
buckets 列表 要扫描的扫描存储桶列表。
scheduling 列表 配置所有存储桶的定期扫描。如果配置了调度,则不能使用 start_timeend_timerange

扫描存储桶

选项 必需 类型 描述
bucket 映射 为每个存储桶提供选项。

您可以在 bucket 设置映射中配置以下选项。

选项 必需 类型 描述
名称 字符串 表示要扫描的 S3 存储桶名称的字符串。
filter 过滤器 提供过滤器配置。
start_time 字符串 从给定 start_time 后开始扫描对象的时间。这应遵循 ISO LocalDateTime 格式,例如 023-01-23T10:00:00。如果 end_timestart_time 一起配置,则 start_time 之后和 end_time 之前的所有对象都将被处理。start_timerange 不能一起使用。这将覆盖 扫描级别的 start_time
结束时间 字符串 在给定 end_time 后不再扫描对象的时间。这应遵循 ISO LocalDateTime 格式,例如 023-01-23T10:00:00。如果 start_timeend_time 一起配置,则 start_time 之后和 end_time 之前的所有对象都将被处理。这将覆盖 扫描级别的 end_time
range 字符串 从所有存储桶扫描对象的时间范围。支持 ISO_8601 表示法字符串,例如 PT20.345SPT15M,以及秒 (60s) 和毫秒 (1600ms) 的表示法字符串。start_timeend_time 不能与 range 一起使用。范围 P12H 会扫描管道启动后过去 12 小时内修改的所有对象。这将覆盖 扫描级别的 range

filter

filter 配置中使用以下选项。

选项 必需 类型 描述
include_prefix 列表 扫描中包含的 S3 键前缀字符串列表。默认情况下,存储桶中的所有对象都包括在内。
exclude_suffix 列表 从扫描中排除的 S3 键后缀字符串列表。默认情况下,不排除存储桶中的任何对象。

scheduling

选项 必需 类型 描述
interval 字符串 指示每次扫描之间的最小间隔。间隔中的下一次扫描将在上次扫描的间隔持续时间结束且处理完上次扫描的所有对象后开始。支持 ISO 8601 表示法字符串,例如 PT20.345SPT15M,以及秒 (60s) 和毫秒 (1600ms) 的表示法字符串。
count 整数 指定存储桶将被扫描的次数。默认为 Integer.MAX_VALUE

指标

s3 源包括以下指标

计数器

  • s3ObjectsFaileds3 源未能读取的 S3 对象数量。
  • s3ObjectsNotFounds3 源因 S3“未找到”错误而未能读取的 S3 对象数量。这些也计入 s3ObjectsFailed
  • s3ObjectsAccessDenieds3 源因“访问被拒绝”或“禁止”错误而未能读取的 S3 对象数量。这些也计入 s3ObjectsFailed
  • s3ObjectsSucceededs3 源成功读取的 S3 对象数量。
  • s3ObjectNoRecordsFounds3 源向缓冲区添加了 0 条记录的 S3 对象数量。
  • s3ObjectsDeleteds3 源删除的 S3 对象数量。
  • s3ObjectsDeleteFaileds3 源未能删除的 S3 对象数量。
  • s3ObjectsEmpty:被视为为空的 S3 对象数量,因为它们的大小为 0s3 源将跳过这些对象。
  • sqsMessagesReceiveds3 源从队列接收的 Amazon SQS 消息数量。
  • sqsMessagesDeleteds3 源从队列删除的 Amazon SQS 消息数量。
  • sqsMessagesFaileds3 源未能解析的 Amazon SQS 消息数量。
  • sqsMessagesDeleteFaileds3 源未能从 SQS 队列删除的 SQS 消息数量。
  • sqsVisibilityTimeoutChangedCounts3 源更改 SQS 消息可见性超时的次数。这包括同一消息上的多次可见性超时更改。
  • sqsVisibilityTimeoutChangeFailedCounts3 源未能更改 SQS 消息可见性超时的次数。这包括同一消息上的多次可见性超时更改失败。
  • acknowledgementSetCallbackCounters3 源从 Data Prepper 接收到确认的次数。

计时器

  • s3ObjectReadTimeElapsed:测量 s3 源执行获取 S3 对象请求、解析并写入事件到缓冲区所需的时间量。
  • sqsMessageDelay:测量从 S3 创建对象到其完全解析所经过的时间。

分布汇总

  • s3ObjectSizeBytes:测量 S3 对象的大小,由 S3 Content-Length 报告。对于压缩对象,这是压缩后的大小。
  • s3ObjectProcessedBytes:测量 s3 源为给定对象处理的字节数。对于压缩对象,这是未压缩的大小。
  • s3ObjectsEvents:测量 S3 对象生成的事件(有时称为记录)的数量。

示例:使用 SQS 的未压缩日志

以下 pipeline.yaml 文件显示了读取未压缩的换行符分隔日志的最小配置

source:
  s3:
    notification_type: sqs
    codec:
      newline:
    compression: none
    sqs:
      queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
    aws:
      region: "us-east-1"
      sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"

示例:使用扫描的未压缩日志

以下 pipeline.yaml 文件显示了扫描具有未压缩的换行符分隔日志的对象的最小配置

source:
  s3:
    codec:
      newline:
    compression: none
    aws:
      region: "us-east-1"
      sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"
    scan:
      start_time: 2023-01-01T00:00:00
      range: "P365D"
      buckets:
        - bucket:
            name: "s3-scan-test"
            filter:
              exclude_suffix:
                - "*.log"