摄取处理器
摄取处理器是摄取管道的核心组件。它们在索引前对文档进行预处理。例如,您可以移除字段、从文本中提取值、转换数据格式或附加额外信息。
OpenSearch 在您的 OpenSearch 安装中提供了一套标准的摄取处理器。要获取 OpenSearch 中可用处理器的列表,请使用 节点信息(Nodes Info) API 操作。
GET /_nodes/ingest?filter_path=nodes.*.ingest.processors
要设置和部署摄取处理器,请确保您拥有必要的权限和访问权限。请参阅安全插件 REST API 以了解更多信息。
支持的处理器
处理器类型及其必需或可选参数因您的具体用例而异。OpenSearch 支持以下摄取处理器。有关在 OpenSearch 管道中使用这些处理器的教程,请参阅每个处理器各自的文档。
处理器类型 | 描述 |
---|---|
append(追加) | 向文档中的字段添加一个或多个值。 |
bytes(字节) | 将人类可读的字节值转换为其字节数值。 |
community_id(社区 ID) | 为网络流元组生成社区 ID 流哈希算法。 |
convert(转换) | 更改文档中字段的数据类型。 |
复制 | 将现有字段中的整个对象复制到另一个字段。 |
csv(CSV) | 提取 CSV 并将其作为单独的字段存储在文档中。 |
date(日期) | 从字段中解析日期,然后使用该日期或时间戳作为文档的时间戳。 |
date_index_name(日期索引名称) | 根据文档中的日期或时间戳字段,将文档索引到基于时间的索引中。 |
dissect(分解) | 使用定义的模式从文本字段中提取结构化字段。 |
dot_expander(点展开器) | 将带有点的字段展开为对象字段。 |
drop(丢弃) | 丢弃文档,不进行索引,也不引发任何错误。 |
fail(失败) | 引发异常并停止管道的执行。 |
fingerprint(指纹) | 为文档中特定字段或所有字段生成哈希值。 |
foreach(遍历) | 允许将另一个处理器应用于文档中数组或对象字段的每个元素。 |
geoip(地理 IP) | 添加有关 IP 地址地理位置的信息。 |
geojson-feature(GeoJSON 特征) | 将 GeoJSON 数据索引到地理空间字段中。 |
grok(模式匹配) | 使用模式匹配解析和结构化非结构化数据。 |
gsub | gsub(字符串替换) |
html_strip(HTML 剥离) | 从文本字段中移除 HTML 标签并返回纯文本内容。 |
ip2geo(IP 到地理位置) | 添加有关 IPv4 或 IPv6 地址地理位置的信息。 |
join(连接) | 使用分隔符将数组的每个元素连接成一个字符串。 |
json(JSON) | 将 JSON 字符串转换为结构化的 JSON 对象。 |
kv(键值对) | 自动解析字段中的键值对。 |
lowercase(小写) | 将特定字段中的文本转换为小写字母。 |
pipeline(管道) | 运行内部管道。 |
remove(移除) | 从文档中移除字段。 |
remove_by_pattern(按模式移除) | 根据字段模式从文档中移除字段。 |
rename(重命名) | 重命名现有字段。 |
script(脚本) | 对传入文档运行内联或存储脚本。 |
set(设置) | 将字段的值设置为指定值。 |
sort(排序) | 按升序或降序对数组元素进行排序。 |
sparse_encoding(稀疏编码) | 从文本字段生成稀疏向量/词元和权重,用于使用稀疏检索的神经稀疏搜索。 |
split(拆分) | 使用分隔符将字段拆分成数组。 |
text_chunking(文本分块) | 将长文档拆分成更小的块。 |
text_embedding(文本嵌入) | 从文本字段生成向量嵌入,用于语义搜索。 |
text_image_embedding(文本图像嵌入) | 从文本和图像字段生成组合向量嵌入,用于多模态神经搜索。 |
trim(修剪) | 移除字符串字段开头和结尾的空白字符。 |
uppercase(大写) | 将特定字段中的文本转换为大写字母。 |
urldecode(URL 解码) | 从 URL 编码格式解码字符串。 |
user_agent(用户代理) | 从浏览器发送到其 Web 请求的用户代理中提取详细信息。 |
处理器限制设置
您可以使用集群设置 cluster.ingest.max_number_processors
来限制摄取处理器的数量。处理器总数包括常规处理器数量和on_failure
处理器数量。
cluster.ingest.max_number_processors
的默认值为 Integer.MAX_VALUE
。如果添加的处理器数量超过 cluster.ingest.max_number_processors
中配置的值,将抛出 IllegalStateException
。
启用批处理的处理器
某些处理器支持批处理摄取——它们可以同时批量处理多个文档。这些启用批处理的处理器在使用批处理时通常能提供更好的性能。对于批处理,请使用批量 API (Bulk API) 并提供 batch_size
参数。所有启用批处理的处理器都具有批处理模式和单文档模式。当您使用 PUT
方法摄取文档时,处理器在单文档模式下运行,并按顺序处理文档。目前,只有 text_embedding
和 sparse_encoding
处理器启用了批处理。所有其他处理器一次处理一个文档。
选择性启用处理器
通过提供 ingest-common.processors.allowed
集群设置,可以选择性地启用由 ingest-common 模块定义的处理器。如果未提供此设置,则默认启用所有处理器。指定空列表将禁用所有处理器。如果更改此设置以移除先前启用的处理器,则任何使用被禁用处理器的管道将在节点重启后失效,因为新设置会生效。