OpenSearch Data Prepper 的 aws_lambda 集成
AWS Lambda 集成允许开发人员在其 OpenSearch Data Prepper 管道中使用无服务器计算功能,实现灵活的事件处理和数据路由。
AWS Lambda 处理器配置
aws_lambda
处理器支持在 Data Prepper 管道中调用 AWS Lambda 函数以处理事件。它根据您的用例支持同步和异步调用。
配置字段
您可以使用以下配置选项来配置处理器。
字段 | 类型 | 必需 | 描述 |
---|---|---|---|
函数名称 | 字符串 | 必需 | 要调用的 AWS Lambda 函数的名称。 |
调用类型 | 字符串 | 必需 | 指定调用类型,可以是 request-response 或 event 。默认值为 request-response 。 |
aws.region | 字符串 | 必需 | Lambda 函数所在的 AWS 区域。 |
aws.sts_role_arn | 字符串 | 可选 | 在调用 Lambda 函数之前要承担的角色的 Amazon 资源名称 (ARN)。 |
最大重试次数 | 整数 | 可选 | 失败调用的最大重试次数。默认值为 3 。 |
批处理 | 对象 | 可选 | Lambda 调用的批处理设置。默认值为 key_name = "events" 。默认阈值为 event_count=100 、maximum_size="5mb" 和 event_collect_timeout = 10s 。 |
lambda_when | 字符串 | 可选 | 一个条件表达式,用于确定何时调用 Lambda 处理器。 |
响应编解码器 | 对象 | 可选 | 用于解析 Lambda 响应的编解码器配置。默认值为 json 。 |
匹配失败时的标签 | 列表 | 可选 | 当 Lambda 匹配失败或遇到意外错误时,要添加到事件的标签列表。 |
SDK 超时 | 持续时间 | 可选 | 配置 SDK 客户端连接超时期限。默认值为 60s 。 |
响应事件匹配 | 布尔型 | 可选 | 指定 Data Prepper 如何解释和处理 Lambda 函数响应。默认值为 false 。 |
示例配置
processors:
- aws_lambda:
function_name: "my-lambda-function"
invocation_type: "request-response"
response_events_match: false
aws:
region: "us-east-1"
sts_role_arn: "arn:aws:iam::123456789012:role/my-lambda-role"
max_retries: 3
batch:
key_name: "events"
threshold:
event_count: 100
maximum_size: "5mb"
event_collect_timeout: PT10S
lambda_when: "event['status'] == 'process'"
用法
处理器支持以下调用类型
request-response
:处理器等待 Lambda 函数完成后再继续。event
:函数异步触发,无需等待响应。batch
:启用后,事件会被聚合并批量发送,以优化 Lambda 调用。批处理阈值控制事件数量、大小限制和超时。codec
:JSON 用于请求和响应编解码器。Lambda 必须返回 JSON 数组输出。tags_on_match_failure
:当 Lambda 处理失败或遇到意外问题时,可将自定义标签应用于事件。
行为
当配置为批处理时,AWS Lambda 处理器将多个事件分组到一个请求中。此分组由批处理阈值控制,批处理阈值可以基于事件计数、大小限制或超时。然后,处理器将整个批处理作为单个有效负载发送到 Lambda 函数。
Lambda 响应处理
response_events_match
设置定义了 Data Prepper 如何处理发送到 Lambda 的批处理事件与收到的响应之间的关系。
true
:Lambda 返回一个 JSON 数组,其中包含每个批处理事件的结果。Data Prepper 将此数组映射回其对应的原始事件,确保批处理中的每个事件从数组中获取相应的响应部分。false
:Lambda 为整个批处理返回一个或多个事件。响应事件与原始事件不相关。原始事件元数据不会保留在响应事件中。例如,当response_events_match
设置为true
时,Lambda 函数预期返回的响应事件数量与原始请求数量相同,并保持原始顺序。
限制
请注意以下限制
- 负载限制:6 MB 负载限制
- 响应编解码器:仅支持 JSON 编解码器
集成测试
此插件的集成测试与主 Data Prepper 构建过程分开执行。使用以下 Gradle 命令运行这些测试
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role