ML 推理处理器
ml_inference
处理器用于调用在 OpenSearch ML Commons 插件中注册的机器学习 (ML) 模型。模型输出将作为新字段添加到摄入的文档中。
先决条件
在使用 ml_inference
处理器之前,您必须在 OpenSearch 集群上托管一个本地 ML 模型,或者通过 ML Commons 插件连接一个外部托管模型到您的 OpenSearch 集群。有关本地模型的更多信息,请参阅在 OpenSearch 中使用 ML 模型。有关外部托管模型的更多信息,请参阅连接到外部托管模型。
语法
以下是 ml-inference
处理器的语法:
{
"ml_inference": {
"model_id": "<model_id>",
"function_name": "<function_name>",
"full_response_path": "<full_response_path>",
"model_config":{
"<model_config_field>": "<config_value>"
},
"model_input": "<model_input>",
"input_map": [
{
"<model_input_field>": "<document_field>"
}
],
"output_map": [
{
"<new_document_field>": "<model_output_field>"
}
],
"override": "<override>"
}
}
配置参数
下表列出了 ml-inference
处理器所需的参数和可选参数。
参数 | 数据类型 | 必需/可选 | 描述 |
---|---|---|---|
model_id | 字符串 | 必需 | 处理器使用的 ML 模型的 ID。 |
function_name | 字符串 | 外部托管模型可选 本地模型必填 | 在处理器中配置的 ML 模型的功能名称。对于本地模型,有效值为 sparse_encoding 、sparse_tokenize 、text_embedding 和 text_similarity 。对于外部托管模型,有效值为 remote 。默认值为 remote 。 |
model_config | 对象 | 可选 | ML 模型的自定义配置选项。对于外部托管模型,如果设置,此配置将覆盖默认连接器参数。对于本地模型,您可以将 model_config 添加到 model_input 中,以覆盖注册期间设置的模型配置。更多信息,请参阅 model_config 对象。 |
model_input | 字符串 | 外部托管模型可选 本地模型必填 | 定义模型所需输入字段格式的模板。每种本地模型类型可能使用不同的输入集。对于外部托管模型,默认值为 "{ \"parameters\": ${ml_inference.parameters} } 。 |
input_map | 数组 | 外部托管模型可选 本地模型必填 | 一个数组,指定如何将摄入的文档字段映射到模型输入字段。数组的每个元素都是 "<model_input_field>": "<document_field>" 格式的映射,对应于文档字段的一次模型调用。如果未为外部托管模型指定输入映射,则文档中的所有字段将直接作为输入传递给模型。input_map 的大小表示模型被调用的次数(即 Predict API 请求的数量)。 |
<model_input_field> | 字符串 | 外部托管模型可选 本地模型必填 | 模型输入字段名称。 |
<document_field> | 字符串 | 外部托管模型可选 本地模型必填 | 用作模型输入的摄入文档字段的名称或 JSON 路径。 |
output_map | 数组 | 外部托管模型可选 本地模型必填 | 一个数组,指定如何将模型输出字段映射到摄入文档中的新字段。数组的每个元素都是 "<new_document_field>": "<model_output_field>" 格式的映射。 |
<new_document_field> | 字符串 | 外部托管模型可选 本地模型必填 | 摄入文档中用于存储模型输出(由 model_output 指定)的新字段的名称。如果未为外部托管模型指定输出映射,则模型输出中的所有字段都将添加到新的文档字段中。 |
<model_output_field> | 字符串 | 外部托管模型可选 本地模型必填 | 模型输出中要存储在 new_document_field 中的字段的名称或 JSON 路径。 |
full_response_path | 布尔型 | 可选 | 如果 model_output_field 包含字段的完整 JSON 路径而非字段名称,则将此参数设置为 true 。然后将完全解析模型输出以获取字段的值。本地模型默认值为 true ,外部托管模型默认值为 false 。 |
ignore_missing | 布尔型 | 可选 | 如果设置为 true 并且 input_map 或 output_map 中定义的任何输入字段缺失,则忽略缺失字段。否则,缺失字段将导致失败。默认值为 false 。 |
ignore_failure | 布尔型 | 可选 | 指定即使处理器遇到错误是否继续执行。如果为 true ,则忽略任何故障并继续摄入。如果为 false ,则任何故障都会导致摄入被取消。默认值为 false 。 |
override | 布尔型 | 可选 | 如果摄入的文档已包含一个名称与 <new_document_field> 中指定的字段名称相同的字段,则此参数适用。如果 override 为 false ,则跳过该输入字段。如果为 true ,则现有字段值将被新的模型输出覆盖。默认值为 false 。 |
max_prediction_tasks | 整数 | 可选 | 文档摄入期间可运行的并发模型调用最大数量。默认值为 10 。 |
description | 字符串 | 可选 | 处理器的简要描述。 |
tag | 字符串 | 可选 | 处理器的标识符标签。有助于调试以区分相同类型的处理器。 |
input_map
和 output_map
映射支持标准 JSON path 表示法,用于指定复杂数据结构。
使用处理器
按照以下步骤在管道中使用处理器。创建处理器时必须提供模型 ID。在测试管道或使用处理器摄入文档之前,请确保模型已成功部署。您可以使用 Get Model API 检查模型状态。
对于本地模型,您必须提供一个 model_input
字段来指定模型输入格式。将 model_config
中的任何输入字段添加到 model_input
。
对于远程模型,model_input
字段是可选的,其默认值为 "{ \"parameters\": ${ml_inference.parameters} }
。
示例:外部托管模型
以下示例使用外部托管模型配置 ml_inference
处理器。
步骤 1:创建管道
以下示例为外部托管的文本嵌入模型创建摄入管道。该模型需要一个 input
字段,并在 data
字段中生成结果。它将 passage_text
字段中的文本转换为文本嵌入,并将嵌入存储在 passage_embedding
字段中。处理器配置中未明确指定 function_name
,因此其默认值为 remote
,表示这是一个外部托管模型。
PUT /_ingest/pipeline/ml_inference_pipeline
{
"description": "Generate passage_embedding for ingested documents",
"processors": [
{
"ml_inference": {
"model_id": "<your model id>",
"input_map": [
{
"input": "passage_text"
}
],
"output_map": [
{
"passage_embedding": "data"
}
]
}
}
]
}
对于对外部托管模型发出的 Predict API 请求,所有字段通常都嵌套在 parameters
对象中。
POST /_plugins/_ml/models/cleMb4kBJ1eYAeTMFFg4/_predict
{
"parameters": {
"input": [
{
...
}
]
}
}
为外部托管模型指定 input_map
时,您可以直接引用 input
字段,而无需提供其点路径 parameters.input
。
"input_map": [
{
"input": "passage_text"
}
]
步骤 2(可选):测试管道
建议在摄取文档之前测试您的管道。
要测试管道,请运行以下查询
POST _ingest/pipeline/ml_inference_pipeline/_simulate
{
"docs": [
{
"_index": "testindex1",
"_id": "1",
"_source":{
"passage_text": "hello world"
}
}
]
}
响应
响应确认处理器已在 passage_embedding
字段中生成了文本嵌入。现在文档同时包含 passage_text
和 passage_embedding
字段。
{
"docs" : [
{
"doc" : {
"_index" : "testindex1",
"_id" : "1",
"_source" : {
"passage_embedding" : [
0.017304314,
-0.021530833,
0.050184276,
0.08962978,
...
],
"passage_text" : "hello world"
},
"_ingest" : {
"timestamp" : "2023-10-11T22:35:53.654650086Z"
}
}
}
]
}
创建摄入管道后,您需要创建一个用于摄入的索引并将文档摄入到该索引中。
示例:本地模型
以下示例使用本地模型配置 ml_inference
处理器。
步骤 1:创建管道
以下示例为 huggingface/sentence-transformers/all-distilroberta-v1
本地模型创建摄入管道。该模型是一个托管在您的 OpenSearch 集群中的句子转换器预训练模型。
如果您使用 Predict API 调用模型,则请求如下所示:
POST /_plugins/_ml/_predict/text_embedding/cleMb4kBJ1eYAeTMFFg4
{
"text_docs":[ "today is sunny"],
"return_number": true,
"target_response": ["sentence_embedding"]
}
使用此模式,按如下方式指定 model_input
:
"model_input": "{ \"text_docs\": ${input_map.text_docs}, \"return_number\": ${model_config.return_number}, \"target_response\": ${model_config.target_response} }"
在 input_map
中,将 book.*.chunk.text.*.context
文档字段映射到模型所需的 text_docs
字段。
"input_map": [
{
"text_docs": "book.*.chunk.text.*.context"
}
]
由于您将要转换为嵌入的字段指定为 JSON 路径,因此需要将 full_response_path
设置为 true
,以便解析完整的 JSON 文档以获取输入字段。
"full_response_path": true
您将索引的文档将显示如下。 context
字段中的文本将用于生成嵌入。
"book": [
{
"chunk": {
"text": [
{
"chapter": "first chapter",
"context": "this is the first part"
}
]
}
}
]
Predict API 请求返回以下响应:
{
"inference_results" : [
{
"output" : [
{
"name" : "sentence_embedding",
"data_type" : "FLOAT32",
"shape" : [
768
],
"data" : [
0.25517133,
-0.28009856,
0.48519906,
...
]
}
]
}
]
}
模型在 $.inference_results.*.output.*.data
字段中生成嵌入。output_map
将此字段映射到摄入文档中新创建的 book.*.chunk.text.*.context_embedding
字段。
"output_map": [
{
"book.*.chunk.text.*.context_embedding": "$.inference_results.*.output.*.data"
}
]
要使用本地模型配置 ml_inference
处理器,请明确指定 function_name
。在此示例中,function_name
是 text_embedding
。有关有效的 function_name
值的信息,请参阅配置参数。
在此示例中,ml_inference
处理器与本地模型的最终配置如下所示:
PUT /_ingest/pipeline/ml_inference_pipeline_local
{
"description": "ingests reviews and generates embeddings",
"processors": [
{
"ml_inference": {
"function_name": "text_embedding",
"full_response_path": true,
"model_id": "<your model id>",
"model_config": {
"return_number": true,
"target_response": ["sentence_embedding"]
},
"model_input": "{ \"text_docs\": ${input_map.text_docs}, \"return_number\": ${model_config.return_number}, \"target_response\": ${model_config.target_response} }",
"input_map": [
{
"text_docs": "book.*.chunk.text.*.context"
}
],
"output_map": [
{
"book.*.chunk.text.*.context_embedding": "$.inference_results.*.output.*.data"
}
],
"ignore_missing": true,
"ignore_failure": true
}
}
]
}
步骤 2(可选):测试管道
要测试管道,请运行以下查询
POST _ingest/pipeline/ml_inference_pipeline/_simulate
{
"docs": [
{
"_index": "my_books",
"_id": "1",
"_source": {
"book": [
{
"chunk": {
"text": [
{
"chapter": "first chapter",
"context": "this is the first part"
},
{
"chapter": "first chapter",
"context": "this is the second part"
}
]
}
},
{
"chunk": {
"text": [
{
"chapter": "second chapter",
"context": "this is the third part"
},
{
"chapter": "second chapter",
"context": "this is the fourth part"
}
]
}
}
]
}
}
]
}
响应
响应确认处理器已在 context_embedding
字段中生成了文本嵌入。现在文档在相同路径下同时包含 context
和 context_embedding
字段。
{
"docs" : [
{
"doc" : {
"_index": "my_books",
"_id": "1",
"_source": {
"book": [
{
"chunk": {
"text": [
{
"chapter": "first chapter",
"context": "this is the first part",
"context_embedding": [
0.15756914,
0.05150984,
0.25225413,
0.4941875,
...
]
},
{
"chapter": "first chapter",
"context": "this is the second part",
"context_embedding": [
0.10526893,
0.026559234,
0.28763372,
0.4653795,
...
]
}
]
}
},
{
"chunk": {
"text": [
{
"chapter": "second chapter",
"context": "this is the third part",
"context_embedding": [
0.017304314,
-0.021530833,
0.050184276,
0.08962978,
...
]
},
{
"chapter": "second chapter",
"context": "this is the fourth part",
"context_embedding": [
0.37742054,
0.046911318,
1.2053889,
0.04663613,
...
]
}
]
}
}
]
}
}
}
]
}
创建摄入管道后,您需要创建一个用于摄入的索引并将文档摄入到该索引中。