Link Search Menu Expand Document Documentation Menu

带管道处理器的条件

摄入管道中的 pipeline 处理器允许根据文档内容有条件地执行不同的子管道。当不同类型的文档需要单独的处理逻辑时,这提供了强大的灵活性。您可以使用 pipeline 处理器中的 if 参数,根据字段值、数据类型或内容结构将文档定向到不同的管道。然后,每个管道可以独立应用其自己的处理器集。这种方法通过仅在相关时应用逻辑,使您的管道保持模块化和可维护性。

示例:按服务路由日志

以下示例演示了如何根据文档中的 service.name 字段将日志路由到不同的子管道。

创建名为 webapp_logs 的第一个管道

PUT _ingest/pipeline/webapp_logs
{
  "processors": [
    { "set": { "field": "log_type", "value": "webapp" } }
  ]
}

创建名为 api_logs 的第二个管道

PUT _ingest/pipeline/api_logs
{
  "processors": [
    { "set": { "field": "log_type", "value": "api" } }
  ]
}

创建名为 service_router 的主路由管道,它根据 service.name 将文档路由到相应的管道

PUT _ingest/pipeline/service_router
{
  "processors": [
    {
      "pipeline": {
        "name": "webapp_logs",
        "if": "ctx.service?.name == 'webapp'"
      }
    },
    {
      "pipeline": {
        "name": "api_logs",
        "if": "ctx.service?.name == 'api'"
      }
    }
  ]
}

使用以下请求模拟管道

POST _ingest/pipeline/service_router/_simulate
{
  "docs": [
    { "_source": { "service": { "name": "webapp" }, "message": "Homepage loaded" } },
    { "_source": { "service": { "name": "api" }, "message": "GET /v1/users" } },
    { "_source": { "service": { "name": "worker" }, "message": "Task started" } }
  ]
}

响应确认第一个文档由 webapp_logs 管道处理,第二个文档由 api_logs 管道处理。第三个文档保持不变,因为它不符合任何条件

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "log_type": "webapp",
          "message": "Homepage loaded",
          "service": {
            "name": "webapp"
          }
        },
        "_ingest": {
          "timestamp": "2025-04-24T10:54:12.555447087Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "log_type": "api",
          "message": "GET /v1/users",
          "service": {
            "name": "api"
          }
        },
        "_ingest": {
          "timestamp": "2025-04-24T10:54:12.55548442Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "message": "Task started",
          "service": {
            "name": "worker"
          }
        },
        "_ingest": {
          "timestamp": "2025-04-24T10:54:12.555490754Z"
        }
      }
    }
  ]
}

示例:类型特定处理

您还可以使用管道处理器应用类型特定的管道。如果 code 字段是数字,则以下管道将日志定向到 numeric_handler;如果它是 String 类型,则将其定向到 string_handler

创建名为 numeric_handler 的第一个管道

PUT _ingest/pipeline/numeric_handler
{
  "processors": [
    { "set": { "field": "code_type", "value": "numeric" } }
  ]
}

创建名为 string_handler 的第二个管道

PUT _ingest/pipeline/string_handler
{
  "processors": [
    { "set": { "field": "code_type", "value": "string" } }
  ]
}

创建名为 type_router 的主路由管道,它根据 code 字段将文档路由到相应的管道

PUT _ingest/pipeline/type_router
{
  "processors": [
    {
      "pipeline": {
        "name": "numeric_handler",
        "if": "ctx.code instanceof Integer || ctx.code instanceof Long || ctx.code instanceof Double"
      }
    },
    {
      "pipeline": {
        "name": "string_handler",
        "if": "ctx.code instanceof String"
      }
    }
  ]
}

使用以下请求模拟管道

POST _ingest/pipeline/type_router/_simulate
{
  "docs": [
    { "_source": { "code": 404 } },
    { "_source": { "code": "ERR_NOT_FOUND" } }
  ]
}

返回的文档包含由各个子管道添加的新字段 code_type

{
  "docs": [
    {
      "doc": {
        "_source": {
          "code": 404,
          "code_type": "numeric"
        }
      }
    },
    {
      "doc": {
        "_source": {
          "code": "ERR_NOT_FOUND",
          "code_type": "string"
        }
      }
    }
  ]
}
剩余 350 字符

有问题?

想贡献吗?