带管道处理器的条件
摄入管道中的 pipeline
处理器允许根据文档内容有条件地执行不同的子管道。当不同类型的文档需要单独的处理逻辑时,这提供了强大的灵活性。您可以使用 pipeline
处理器中的 if
参数,根据字段值、数据类型或内容结构将文档定向到不同的管道。然后,每个管道可以独立应用其自己的处理器集。这种方法通过仅在相关时应用逻辑,使您的管道保持模块化和可维护性。
示例:按服务路由日志
以下示例演示了如何根据文档中的 service.name
字段将日志路由到不同的子管道。
创建名为 webapp_logs
的第一个管道
PUT _ingest/pipeline/webapp_logs
{
"processors": [
{ "set": { "field": "log_type", "value": "webapp" } }
]
}
创建名为 api_logs
的第二个管道
PUT _ingest/pipeline/api_logs
{
"processors": [
{ "set": { "field": "log_type", "value": "api" } }
]
}
创建名为 service_router
的主路由管道,它根据 service.name
将文档路由到相应的管道
PUT _ingest/pipeline/service_router
{
"processors": [
{
"pipeline": {
"name": "webapp_logs",
"if": "ctx.service?.name == 'webapp'"
}
},
{
"pipeline": {
"name": "api_logs",
"if": "ctx.service?.name == 'api'"
}
}
]
}
使用以下请求模拟管道
POST _ingest/pipeline/service_router/_simulate
{
"docs": [
{ "_source": { "service": { "name": "webapp" }, "message": "Homepage loaded" } },
{ "_source": { "service": { "name": "api" }, "message": "GET /v1/users" } },
{ "_source": { "service": { "name": "worker" }, "message": "Task started" } }
]
}
响应确认第一个文档由 webapp_logs
管道处理,第二个文档由 api_logs
管道处理。第三个文档保持不变,因为它不符合任何条件
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"log_type": "webapp",
"message": "Homepage loaded",
"service": {
"name": "webapp"
}
},
"_ingest": {
"timestamp": "2025-04-24T10:54:12.555447087Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"log_type": "api",
"message": "GET /v1/users",
"service": {
"name": "api"
}
},
"_ingest": {
"timestamp": "2025-04-24T10:54:12.55548442Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"message": "Task started",
"service": {
"name": "worker"
}
},
"_ingest": {
"timestamp": "2025-04-24T10:54:12.555490754Z"
}
}
}
]
}
示例:类型特定处理
您还可以使用管道处理器应用类型特定的管道。如果 code
字段是数字,则以下管道将日志定向到 numeric_handler
;如果它是 String
类型,则将其定向到 string_handler
。
创建名为 numeric_handler
的第一个管道
PUT _ingest/pipeline/numeric_handler
{
"processors": [
{ "set": { "field": "code_type", "value": "numeric" } }
]
}
创建名为 string_handler
的第二个管道
PUT _ingest/pipeline/string_handler
{
"processors": [
{ "set": { "field": "code_type", "value": "string" } }
]
}
创建名为 type_router
的主路由管道,它根据 code
字段将文档路由到相应的管道
PUT _ingest/pipeline/type_router
{
"processors": [
{
"pipeline": {
"name": "numeric_handler",
"if": "ctx.code instanceof Integer || ctx.code instanceof Long || ctx.code instanceof Double"
}
},
{
"pipeline": {
"name": "string_handler",
"if": "ctx.code instanceof String"
}
}
]
}
使用以下请求模拟管道
POST _ingest/pipeline/type_router/_simulate
{
"docs": [
{ "_source": { "code": 404 } },
{ "_source": { "code": "ERR_NOT_FOUND" } }
]
}
返回的文档包含由各个子管道添加的新字段 code_type
{
"docs": [
{
"doc": {
"_source": {
"code": 404,
"code_type": "numeric"
}
}
},
{
"doc": {
"_source": {
"code": "ERR_NOT_FOUND",
"code_type": "string"
}
}
}
]
}