访问管道中的数据
在摄取管道中,您可以使用 ctx
对象访问文档数据。此对象表示已处理的文档,允许您读取、修改或丰富文档字段。管道处理器对文档的 _source
字段及其元数据字段都具有读写访问权限。
访问文档字段
ctx
对象公开所有文档字段。您可以使用点表示法直接访问它们。
示例:访问顶级字段
给定以下示例文档
{
"user": "alice"
}
您可以按如下方式访问 user
"field": "ctx.user"
示例:访问嵌套字段
给定以下示例文档
{
"user": {
"name": "alice"
}
}
您可以按如下方式访问 user.name
"field": "ctx.user.name"
访问源中的字段
要访问文档 _source
中的字段,请通过其名称引用该字段
{
"set": {
"field": "environment",
"value": "production"
}
}
或者,您可以显式使用 _source
{
"set": {
"field": "_source.environment",
"value": "production"
}
}
访问元数据字段
您可以读取或写入以下元数据字段
_index
_type
_id
_routing
示例:动态设置 _routing
{
"set": {
"field": "_routing",
"value": "{{region}}"
}
}
访问摄取元数据字段
_ingest.timestamp
字段表示摄取节点接收文档的时间。要保留此时间戳,请使用 set
处理器
{
"set": {
"field": "received_at",
"value": "{{_ingest.timestamp}}"
}
}
在 Mustache 模板中使用 ctx
使用 Mustache 模板将字段值插入处理器设置中。对于未转义的字段值,请使用三对大括号 ({{{
和 }}}
)。
示例:组合源字段
以下处理器配置将 app
和 env
字段用下划线 (_) 分隔组合起来,并将结果存储在 log_label
字段中
{
"set": {
"field": "log_label",
"value": "{{{app}}}_{{{env}}}"
}
}
示例:使用 set
处理器生成动态问候语
如果文档的 user
字段设置为 alice
,请使用以下语法生成结果 "greeting": "Hello, alice!"
{
"set": {
"field": "greeting",
"value": "Hello, {{{user}}}!"
}
}
动态字段名
您可以使用字段的值作为新字段的名称
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}
示例:根据状态路由到动态索引
以下处理器配置通过在 status
字段的值后附加 -events
来动态设置目标索引
{
"set": {
"field": "_index",
"value": "{{status}}-events"
}
}
在 script
处理器中使用 ctx
使用 script
处理器进行高级转换。
示例:仅当另一个字段缺失时才添加字段
以下处理器仅在文档中缺失 error_message
字段时才添加该字段,并将其值设置为“none”
{
"script": {
"lang": "painless",
"source": "if (ctx.error_message == null) { ctx.error_message = 'none'; }"
}
}
示例:将值从一个字段复制到另一个字段
以下处理器将 timestamp
字段的值复制到名为 event_time
的新字段中
{
"script": {
"lang": "painless",
"source": "ctx.event_time = ctx.timestamp;"
}
}
完整管道示例
以下示例定义了一个完整的摄取管道,该管道使用 source
字段设置标语,从 date
字段中提取 year
,并将文档的摄取时间戳记录在 received_at
字段中
PUT _ingest/pipeline/example-pipeline
{
"description": "Sets tags, log label, and defaults error message",
"processors": [
{
"set": {
"field": "tagline",
"value": "{{{user.first}}} from {{{department}}}"
}
},
{
"script": {
"lang": "painless",
"source": "ctx.year = ctx.date.substring(0, 4);"
}
},
{
"set": {
"field": "received_at",
"value": "{{_ingest.timestamp}}"
}
}
]
}
要测试此管道,请使用以下请求
POST _ingest/pipeline/example-pipeline/_simulate
{
"docs": [
{
"_source": {
"user": {
"first": "Liam"
},
"department": "Engineering",
"date": "2024-12-03T14:05:00Z"
}
}
]
}
响应显示了处理后丰富过的文档,包括新添加的 tagline
、提取的 year
以及由摄取管道生成的 received_at
时间戳
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"user": {
"first": "Liam"
},
"department": "Engineering",
"date": "2024-12-03T14:05:00Z",
"tagline": "Liam from Engineering",
"year": "2024",
"received_at": "2025-04-14T18:40:00.000Z"
},
"_ingest": {
"timestamp": "2025-04-14T18:40:00.000Z"
}
}
}
]
}