Link Search Menu Expand Document Documentation Menu

访问管道中的数据

在摄取管道中,您可以使用 ctx 对象访问文档数据。此对象表示已处理的文档,允许您读取、修改或丰富文档字段。管道处理器对文档的 _source 字段及其元数据字段都具有读写访问权限。

访问文档字段

ctx 对象公开所有文档字段。您可以使用点表示法直接访问它们。

示例:访问顶级字段

给定以下示例文档

{
  "user": "alice"
}

您可以按如下方式访问 user

"field": "ctx.user"

示例:访问嵌套字段

给定以下示例文档

{
  "user": {
    "name": "alice"
  }
}

您可以按如下方式访问 user.name

"field": "ctx.user.name"

访问源中的字段

要访问文档 _source 中的字段,请通过其名称引用该字段

{
  "set": {
    "field": "environment",
    "value": "production"
  }
}

或者,您可以显式使用 _source

{
  "set": {
    "field": "_source.environment",
    "value": "production"
  }
}

访问元数据字段

您可以读取或写入以下元数据字段

  • _index
  • _type
  • _id
  • _routing

示例:动态设置 _routing

{
  "set": {
    "field": "_routing",
    "value": "{{region}}"
  }
}

访问摄取元数据字段

_ingest.timestamp 字段表示摄取节点接收文档的时间。要保留此时间戳,请使用 set 处理器

{
  "set": {
    "field": "received_at",
    "value": "{{_ingest.timestamp}}"
  }
}

在 Mustache 模板中使用 ctx

使用 Mustache 模板将字段值插入处理器设置中。对于未转义的字段值,请使用三对大括号 ({{{}}})。

示例:组合源字段

以下处理器配置将 appenv 字段用下划线 (_) 分隔组合起来,并将结果存储在 log_label 字段中

{
  "set": {
    "field": "log_label",
    "value": "{{{app}}}_{{{env}}}"
  }
}

示例:使用 set 处理器生成动态问候语

如果文档的 user 字段设置为 alice,请使用以下语法生成结果 "greeting": "Hello, alice!"

{
  "set": {
    "field": "greeting",
    "value": "Hello, {{{user}}}!"
  }
}

动态字段名

您可以使用字段的值作为新字段的名称

{
  "set": {
    "field": "{{service}}",
    "value": "{{code}}"
  }
}

示例:根据状态路由到动态索引

以下处理器配置通过在 status 字段的值后附加 -events 来动态设置目标索引

{
  "set": {
    "field": "_index",
    "value": "{{status}}-events"
  }
}

script 处理器中使用 ctx

使用 script 处理器进行高级转换。

示例:仅当另一个字段缺失时才添加字段

以下处理器仅在文档中缺失 error_message 字段时才添加该字段,并将其值设置为“none”

{
  "script": {
    "lang": "painless",
    "source": "if (ctx.error_message == null) { ctx.error_message = 'none'; }"
  }
}

示例:将值从一个字段复制到另一个字段

以下处理器将 timestamp 字段的值复制到名为 event_time 的新字段中

{
  "script": {
    "lang": "painless",
    "source": "ctx.event_time = ctx.timestamp;"
  }
}

完整管道示例

以下示例定义了一个完整的摄取管道,该管道使用 source 字段设置标语,从 date 字段中提取 year,并将文档的摄取时间戳记录在 received_at 字段中

PUT _ingest/pipeline/example-pipeline
{
  "description": "Sets tags, log label, and defaults error message",
  "processors": [
    {
      "set": {
        "field": "tagline",
        "value": "{{{user.first}}} from {{{department}}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "ctx.year = ctx.date.substring(0, 4);"
      }
    },
    {
      "set": {
        "field": "received_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

要测试此管道,请使用以下请求

POST _ingest/pipeline/example-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "user": {
          "first": "Liam"
        },
        "department": "Engineering",
        "date": "2024-12-03T14:05:00Z"
      }
    }
  ]
}

响应显示了处理后丰富过的文档,包括新添加的 tagline、提取的 year 以及由摄取管道生成的 received_at 时间戳

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "user": {
            "first": "Liam"
          },
          "department": "Engineering",
          "date": "2024-12-03T14:05:00Z",
          "tagline": "Liam from Engineering",
          "year": "2024",
          "received_at": "2025-04-14T18:40:00.000Z"
        },
        "_ingest": {
          "timestamp": "2025-04-14T18:40:00.000Z"
        }
      }
    }
  ]
}