S3 日志
OpenSearch Data Prepper 允许您从 Amazon 简单存储服务 (Amazon S3) 加载日志,包括传统日志、JSON 文档和 CSV 日志。
架构
Data Prepper 可以使用 Amazon 简单队列服务 (SQS) (Amazon SQS) 队列和 Amazon S3 事件通知从 S3 存储桶读取对象。
Data Prepper 轮询 Amazon SQS 队列以获取 S3 事件通知。当 Data Prepper 收到 S3 对象已创建的通知时,Data Prepper 将读取并解析该 S3 对象。
下图显示了所涉及组件的整体架构。
组件数据流如下所示
- 系统将日志生成到 S3 存储桶中。
- S3 在 SQS 队列中创建 S3 事件通知。
- Data Prepper 轮询 Amazon SQS 以获取消息,然后接收消息。
- Data Prepper 从 S3 对象下载内容。
- Data Prepper 将 S3 对象中的内容发送到 OpenSearch 作为文档。
管道概述
Data Prepper 支持使用 s3
来源从 S3 读取数据。
下图显示了 Data Prepper 从 S3 读取数据的管道概念性概述。
先决条件
在 Data Prepper 从 S3 读取日志数据之前,您需要满足以下先决条件
- 一个 S3 存储桶。
- 一个将日志写入 S3 的日志生产者。确切的日志生产者将根据您的具体用例而异,但可能包括将日志写入 S3 或 Amazon CloudWatch 等服务。
入门
使用以下步骤开始使用 Data Prepper 从 S3 加载日志。
- 为您的 S3 事件通知创建一个 SQS 标准队列。
- 为 SQS 配置存储桶通知。使用
s3:ObjectCreated:*
事件类型。 - 授予 Data Prepper AWS IAM 权限,以便访问 SQS 和 S3。
- (推荐) 创建一个 SQS 死信队列 (DLQ)。
- (推荐) 配置 SQS 重试策略,将失败消息移入 DLQ。
为 Data Prepper 设置权限
要查看 S3 日志,Data Prepper 需要访问 Amazon SQS 和 S3。使用以下示例设置权限
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3-access",
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<YOUR-BUCKET>/*"
},
{
"Sid": "sqs-access",
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:ReceiveMessage"
],
"Resource": "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
},
{
"Sid": "kms-access",
"Effect": "Allow",
"Action": "kms:Decrypt",
"Resource": "arn:aws:kms:<YOUR-REGION>:<123456789012>:key/<YOUR-KMS-KEY>"
}
]
}
如果您的 S3 对象或 SQS 队列不使用 KMS,您可以移除 kms:Decrypt
权限。
SQS 死信队列
以下两种选项可用于处理 S3 对象处理错误
- 使用 SQS 死信队列 (DLQ) 跟踪失败。这是推荐的方法。
- 从 SQS 删除消息。您必须手动找到 S3 对象并更正错误。
下图显示了使用 SQS 和 DLQ 时的系统架构。
要使用 SQS 死信队列,请执行以下步骤
- 创建一个新的 SQS 标准队列作为 DLQ。
- 配置 SQS 重试策略以使用 DLQ。考虑将最大接收次数设置为 2 或 3 等较小值。
- 将 Data Prepper
s3
来源配置为在on_error
时使用retain_messages
。这是默认行为。
管道设计
创建一个管道以从 S3 读取日志,从 s3
来源插件开始。请参考以下示例进行指导。
s3-log-pipeline:
source:
s3:
notification_type: sqs
compression: gzip
codec:
newline:
sqs:
# Change this value to your SQS Queue URL
queue_url: "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
visibility_timeout: "2m"
根据您的用例配置以下选项
queue_url
:这是 SQS 队列 URL,对您的管道而言始终是唯一的。codec
:编解码器决定如何解析传入数据。visibility_timeout
:将此值配置为足够大,以便 Data Prepper 处理 10 个 S3 对象。但是,如果此值过大,则未能处理的消息将至少需要指定值的时间才能由 Data Prepper 重试。
每个选项的默认值适用于大多数用例。有关 S3 来源的所有可用选项,请参阅 s3
。
s3-log-pipeline:
source:
s3:
notification_type: sqs
compression: gzip
codec:
newline:
sqs:
# Change this value to your SQS Queue URL
queue_url: "arn:aws:sqs:<YOUR-REGION>:<123456789012>:<YOUR-SQS-QUEUE>"
visibility_timeout: "2m"
aws:
# Specify the correct region
region: "<YOUR-REGION>"
# This shows using an STS role, but you can also use your system's default permissions.
sts_role_arn: "arn:aws:iam::<123456789012>:role/<DATA-PREPPER-ROLE>"
processor:
# You can configure a grok pattern to enrich your documents in OpenSearch.
#- grok:
# match:
# message: [ "%{COMMONAPACHELOG}" ]
sink:
- opensearch:
hosts: [ "https://:9200" ]
# Change to your credentials
username: "admin"
password: "admin"
index: s3_logs
多个 Data Prepper 管道
建议每个 Data Prepper 管道拥有一个 SQS 队列。此外,同一个集群中的多个节点可以从同一个 SQS 队列读取数据,这不需要额外的 Data Prepper 配置。
如果您有多个管道,则必须为每个管道创建多个 SQS 队列,即使这两个管道使用相同的 S3 存储桶。
Amazon SNS 扇出模式
为了满足 S3 生成的日志规模,一些用户需要为其日志使用多个 SQS 队列。您可以使用 Amazon 简单通知服务 (Amazon SNS) 将 S3 的事件通知路由到 SQS 扇出模式。使用 SNS,所有 S3 事件通知都直接发送到一个 SNS 主题,您可以在其中订阅多个 SQS 队列。
为确保 Data Prepper 可以直接从 SNS 主题解析事件,请在 SNS 到 SQS 订阅上配置原始消息传递。应用此选项不会影响订阅 SNS 主题的其他 SQS 队列。
使用 Amazon S3 Select 筛选和检索数据
如果管道使用 S3 来源,您可以在将 S3 对象内容摄入管道之前,使用 SQL 表达式对其进行筛选和计算。
s3_select
选项支持 Parquet 文件格式的对象。它还适用于使用 GZIP 或 BZIP2 压缩的对象(仅适用于 CSV 和 JSON 对象),并支持使用 GZIP 和 Snappy 对 Parquet 文件格式进行列式压缩。
有关使用 Amazon S3 Select 的详细信息,请参阅使用 Amazon S3 Select 筛选和检索数据和Amazon S3 Select 的 SQL 参考。
以下示例管道从 Parquet 文件格式编码的 S3 对象中检索所有数据
pipeline:
source:
s3:
s3_select:
expression: "select * from s3object s"
input_serialization: parquet
notification_type: "sqs"
...
以下示例管道仅检索对象中的前 10,000 条记录
pipeline:
source:
s3:
s3_select:
expression: "select * from s3object s LIMIT 10000"
input_serialization: parquet
notification_type: "sqs"
...
以下示例管道从 data_value
在给定范围 200–500 的 S3 对象中检索记录
pipeline:
source:
s3:
s3_select:
expression: "select s.* from s3object s where s.data_value > 200 and s.data_value < 500 "
input_serialization: parquet
notification_type: "sqs"
...