复杂条件
在摄入管道中,处理器中的 if
参数可以使用 Painless 脚本评估复杂条件。这些条件有助于微调文档处理,允许高级逻辑,例如类型检查、正则表达式和组合多个条件。
多条件检查
您可以组合逻辑运算符,如 &&
(与)、||
(或)和 !
(非)来构建更复杂的条件。以下管道会将包含高于 1000
的 error_code
的文档标记为 spam
并删除它们。
PUT _ingest/pipeline/spammy_error_handler
{
"processors": [
{
"set": {
"field": "tags",
"value": ["spam"],
"if": "ctx.message != null && ctx.message.contains('OutOfMemoryError')"
}
},
{
"drop": {
"if": "ctx.tags != null && ctx.tags.contains('spam') && ctx.error_code != null && ctx.error_code > 1000"
}
}
]
}
您可以使用以下 _simulate
请求测试管道
POST _ingest/pipeline/spammy_error_handler/_simulate
{
"docs": [
{ "_source": { "message": "OutOfMemoryError occurred", "error_code": 1200 } },
{ "_source": { "message": "OutOfMemoryError occurred", "error_code": 800 } },
{ "_source": { "message": "All good", "error_code": 200 } }
]
}
第一个文档被删除,因为它包含 OutOfMemoryError
字符串和一个高于 1000
的 error_code
。
{
"docs": [
null,
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"error_code": 800,
"message": "OutOfMemoryError occurred",
"tags": [
"spam"
]
},
"_ingest": {
"timestamp": "2025-04-23T10:20:10.704359884Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"error_code": 200,
"message": "All good"
},
"_ingest": {
"timestamp": "2025-04-23T10:20:10.704369801Z"
}
}
}
]
}
类型安全评估
使用 instanceof
确保在执行操作之前使用正确的数据类型。以下管道配置为仅当 message
为 String
类型且长度超过 10
个字符时,才添加一个设置为 true
的 processed
字段。
PUT _ingest/pipeline/string_message_check
{
"processors": [
{
"set": {
"field": "processed",
"value": true,
"if": "ctx.message != null && ctx.message instanceof String && ctx.message.length() > 10"
}
}
]
}
使用以下 _simulate
请求测试管道
POST _ingest/pipeline/string_message_check/_simulate
{
"docs": [
{ "_source": { "message": "short" } },
{ "_source": { "message": "This is a longer message" } },
{ "_source": { "message": 1234567890 } }
]
}
只有第二个文档添加了新字段
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"message": "short"
},
"_ingest": {
"timestamp": "2025-04-23T10:28:14.040115261Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"processed": true,
"message": "This is a longer message"
},
"_ingest": {
"timestamp": "2025-04-23T10:28:14.040141469Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"message": 1234567890
},
"_ingest": {
"timestamp": "2025-04-23T10:28:14.040144844Z"
}
}
}
]
}
使用正则表达式
Painless 脚本支持 =~
运算符来评估正则表达式。以下管道标记以 192.168.
开头的可疑 IP 模式。
PUT _ingest/pipeline/flag_suspicious_ips
{
"processors": [
{
"set": {
"field": "alert",
"value": "suspicious_ip",
"if": "ctx.ip != null && ctx.ip =~ /^192\.168\.\d+\.\d+$/"
}
}
]
}
使用以下 _simulate
请求测试管道
POST _ingest/pipeline/flag_suspicious_ips/_simulate
{
"docs": [
{ "_source": { "ip": "192.168.0.1" } },
{ "_source": { "ip": "10.0.0.1" } }
]
}
第一个文档添加了 alert
字段
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"alert": "suspicious_ip",
"ip": "192.168.0.1"
},
"_ingest": {
"timestamp": "2025-04-23T10:32:45.367916428Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"ip": "10.0.0.1"
},
"_ingest": {
"timestamp": "2025-04-23T10:32:45.36793772Z"
}
}
}
]
}
组合字段和空值检查
以下管道会在 level
为 critical
且提供了 timestamp
时,添加一个设置为 high
的 priority
字段。该脚本还确保所有字段都存在并满足特定条件后才继续。
PUT _ingest/pipeline/critical_log_handler
{
"processors": [
{
"set": {
"field": "priority",
"value": "high",
"if": "ctx.level != null && ctx.level == 'critical' && ctx.timestamp != null"
}
}
]
}
使用以下 _simulate
请求测试管道
POST _ingest/pipeline/critical_log_handler/_simulate
{
"docs": [
{ "_source": { "level": "critical", "timestamp": "2025-04-01T00:00:00Z" } },
{ "_source": { "level": "info", "timestamp": "2025-04-01T00:00:00Z" } },
{ "_source": { "level": "critical" } }
]
}
只有第一个文档添加了 priority
字段
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"priority": "high",
"level": "critical",
"timestamp": "2025-04-01T00:00:00Z"
},
"_ingest": {
"timestamp": "2025-04-23T10:39:25.46840371Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"level": "info",
"timestamp": "2025-04-01T00:00:00Z"
},
"_ingest": {
"timestamp": "2025-04-23T10:39:25.46843021Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"level": "critical"
},
"_ingest": {
"timestamp": "2025-04-23T10:39:25.468434835Z"
}
}
}
]
}
多条件处理
以下管道将执行以下操作:
- 如果
env
字段不存在,则添加一个设置为production
的env
字段。 - 如果
status
字段中的值大于或等于500
,则添加一个设置为major
的severity
字段。 - 如果
env
字段设置为test
且message
字段包含debug
,则删除该文档。
PUT _ingest/pipeline/advanced_log_pipeline
{
"processors": [
{
"set": {
"field": "env",
"value": "production",
"if": "!ctx.containsKey('env')"
}
},
{
"set": {
"field": "severity",
"value": "major",
"if": "ctx.status != null && ctx.status >= 500"
}
},
{
"drop": {
"if": "ctx.env == 'test' && ctx.message?.contains('debug')"
}
}
]
}
使用以下 _simulate
请求测试管道
POST _ingest/pipeline/advanced_log_pipeline/_simulate
{
"docs": [
{
"_source": {
"status": 503,
"message": "Server unavailable"
}
},
{
"_source": {
"env": "test",
"message": "debug log output"
}
},
{
"_source": {
"status": 200,
"message": "OK"
}
}
]
}
在响应中,请注意第一个文档添加了 env: production
和 severity: major
字段。第二个文档被删除。第三个文档添加了 env: production
字段。
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"severity": "major",
"message": "Server unavailable",
"env": "production",
"status": 503
},
"_ingest": {
"timestamp": "2025-04-23T10:51:46.795026554Z"
}
}
},
null,
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"message": "OK",
"env": "production",
"status": 200
},
"_ingest": {
"timestamp": "2025-04-23T10:51:46.795048304Z"
}
}
}
]
}
空安全表示法
使用空安全导航表示法 (?.
) 检查字段是否为 null
。请注意,这种表示法可能会静默返回 null
;因此,我们建议首先检查返回的值是否为 null
,然后再使用 .contains
或 ==
等操作。
不安全语法
"if": "ctx.message?.contains('debug')"
如果文档中不存在 message
字段,此请求将返回一个 null_pointer_exception
,并附带消息 Cannot invoke "Object.getClass()" because "value" is null
。
安全语法
"if": "ctx.message != null && ctx.message.contains('debug')"