Link Search Menu Expand Document Documentation Menu

ML 推理搜索响应处理器

2.16 版本引入

ml_inference 搜索响应处理器用于调用已注册的机器学习 (ML) 模型,以便将其输出作为新字段合并到搜索结果中的文档中。

先决条件
在使用 ml_inference 搜索响应处理器之前,您的 OpenSearch 集群上必须托管有本地 ML 模型,或者通过 ML Commons 插件连接了外部托管模型。有关本地模型的更多信息,请参阅在 OpenSearch 中使用 ML 模型。有关外部托管模型的更多信息,请参阅连接到外部托管模型

语法

以下是 ml-inference 搜索响应处理器的语法

{
  "ml_inference": {
    "model_id": "<model_id>",
    "function_name": "<function_name>",
    "full_response_path": "<full_response_path>",
    "model_config":{
      "<model_config_field>": "<config_value>"
    },
    "model_input": "<model_input>",
    "input_map": [
      {
        "<model_input_field>": "<document_field>"
      }
    ],
    "output_map": [
      {
        "<new_document_field>": "<model_output_field>"
      }
    ],
    "override": "<override>",
    "one_to_one": false
  }
}

请求正文字段

下表列出了 ml-inference 搜索响应处理器的必需和可选参数。

参数 数据类型 必需/可选 描述
model_id 字符串 必需 处理器使用的 ML 模型的 ID。
function_name 字符串 外部托管模型可选

本地模型必填
处理器中配置的 ML 模型的函数名称。对于本地模型,有效值包括 sparse_encodingsparse_tokenizetext_embeddingtext_similarity。对于外部托管模型,有效值为 remote。默认值为 remote
model_config 对象 可选 ML 模型的自定义配置选项。对于外部托管模型,如果设置此配置,它将覆盖默认连接器参数。对于本地模型,您可以将 model_config 添加到 model_input 以覆盖注册期间设置的模型配置。有关更多信息,请参阅 model_config 对象
model_input 字符串 外部托管模型可选

本地模型必填
定义模型期望的输入字段格式的模板。每种本地模型类型可能使用不同的输入集。对于外部托管模型,默认值为 "{ \"parameters\": ${ml_inference.parameters} }
input_map 数组 外部托管模型可选

本地模型必填
一个数组,用于指定如何将搜索响应中的文档字段映射到模型输入字段。数组的每个元素都是 "<model_input_field>": "<document_field>" 格式的映射,对应于文档字段的一次模型调用。如果未为外部托管模型指定输入映射,则所有文档字段都将直接作为输入传递给模型。input_map 的大小表示模型的调用次数(即 Predict API 请求的数量)。
<model_input_field> 字符串 外部托管模型可选

本地模型必填
模型输入字段名称。
<document_field> 字符串 外部托管模型可选

本地模型必填
搜索响应中文档字段的名称或 JSON 路径,用作模型输入。
output_map 数组 外部托管模型可选

本地模型必填
一个数组,用于指定如何将模型输出字段映射到搜索响应文档中的新字段。数组的每个元素都是 "<new_document_field>": "<model_output_field>" 格式的映射。
<new_document_field> 字符串 外部托管模型可选

本地模型必填
文档中用于存储模型输出(由 model_output 指定)的新字段的名称。如果未为外部托管模型指定输出映射,则模型输出中的所有字段都将添加到新的文档字段中。
<model_output_field> 字符串 外部托管模型可选

本地模型必填
模型输出中要存储在 new_document_field 中的字段的名称或 JSON 路径。
full_response_path 布尔型 可选 如果 model_output_field 包含字段的完整 JSON 路径而不是字段名称,则将此参数设置为 true。然后将完全解析模型输出以获取字段的值。对于本地模型,默认值为 true;对于外部托管模型,默认值为 false
ignore_missing 布尔型 可选 如果为 trueinput_mapoutput_map 中定义的任何输入字段缺失,则此处理器将被忽略。否则,缺失字段将导致失败。默认值为 false
ignore_failure 布尔型 可选 指定即使遇到错误,处理器是否继续执行。如果为 true,则此处理器将被忽略,搜索继续。如果为 false,则任何失败都会导致搜索被取消。默认值为 false
override 布尔型 可选 如果响应中的文档已包含 <new_document_field> 中指定的名称的字段,则此参数相关。如果 overridefalse,则跳过输入字段。如果为 true,则现有字段值将被新的模型输出覆盖。默认值为 false
max_prediction_tasks 整数 可选 在文档搜索期间可以运行的最大并发模型调用数。默认值为 10
one_to_one 布尔型 可选 将此参数设置为 true,表示为每个文档调用模型一次(发出一次 Predict API 请求)。默认值(false)表示使用搜索响应中的所有文档调用模型,只发出一次 Predict API 请求。
description 字符串 可选 处理器的简要描述。
tag 字符串 可选 处理器的标识符标签。有助于调试以区分相同类型的处理器。

input_mapoutput_map 映射支持标准 JSON path 符号来指定复杂数据结构。

设置

创建一个名为 my_index 的索引并索引一个文档以解释映射

POST /my_index/_doc/1
{
  "passage_text": "hello world"
}

使用处理器

请按照以下步骤在管道中使用处理器。创建处理器时必须提供模型ID。在测试使用此处理器的管道之前,请确保模型已成功部署。您可以使用获取模型API检查模型状态。

对于本地模型,您必须提供一个model_input字段,该字段指定模型输入格式。将model_config中的所有输入字段添加到model_input中。

对于远程模型,model_input字段是可选的,其默认值为"{ \"parameters\": ${ml_inference.parameters} }

示例:本地模型

以下示例展示了如何使用本地模型配置ml_inference搜索响应处理器。

步骤 1:创建管道

以下示例展示了如何为huggingface/sentence-transformers/all-distilroberta-v1本地模型创建搜索管道。该模型是托管在您的OpenSearch集群中的预训练句子转换器模型

如果您使用预测API调用模型,则请求如下所示

POST /_plugins/_ml/_predict/text_embedding/cleMb4kBJ1eYAeTMFFg4
{
  "text_docs":[ "today is sunny"],
  "return_number": true,
  "target_response": ["sentence_embedding"]
}

使用此模式,按如下方式指定 model_input

 "model_input": "{ \"text_docs\": ${input_map.text_docs}, \"return_number\": ${model_config.return_number}, \"target_response\": ${model_config.target_response} }"

input_map中,将passage_text文档字段映射到模型期望的text_docs字段

"input_map": [
  {
    "text_docs": "passage_text"
  }
]

因为您将要转换为嵌入的字段指定为JSON路径,所以您需要将full_response_path设置为true。然后解析完整的JSON文档以获取输入字段

"full_response_path": true

passage_text字段中的文本将用于生成嵌入

{
  "passage_text": "hello world"
}

Predict API 请求返回以下响应:

{
  "inference_results" : [
    {
      "output" : [
        {
          "name" : "sentence_embedding",
          "data_type" : "FLOAT32",
          "shape" : [
            768
          ],
          "data" : [
            0.25517133,
            -0.28009856,
            0.48519906,
            ...
          ]
        }
      ]
    }
  ]
}

模型在$.inference_results.*.output.*.data字段中生成嵌入。output_map将此字段映射到搜索响应文档中新创建的passage_embedding字段

"output_map": [
  {
    "passage_embedding": "$.inference_results.*.output.*.data"
  }
]

要使用本地模型配置ml_inference搜索响应处理器,请明确指定function_name。在此示例中,function_nametext_embedding。有关有效function_name值的信息,请参阅请求字段

以下是使用本地模型的ml_inference搜索响应处理器的最终配置

PUT /_search/pipeline/ml_inference_pipeline_local
{
  "description": "search passage and generates embeddings",
  "response_processors": [
    {
      "ml_inference": {
        "function_name": "text_embedding",
        "full_response_path": true,
        "model_id": "<your model id>",
        "model_config": {
          "return_number": true,
          "target_response": ["sentence_embedding"]
        },
        "model_input": "{ \"text_docs\": ${input_map.text_docs}, \"return_number\": ${model_config.return_number}, \"target_response\": ${model_config.target_response} }",
        "input_map": [
          {
            "text_docs": "passage_text"
          }
        ],
        "output_map": [
          {
            "passage_embedding": "$.inference_results.*.output.*.data"
          }
        ],
        "ignore_missing": true,
        "ignore_failure": true
      }
    }
  ]
}

第2步:运行管道

运行以下查询,在请求中提供管道名称

GET /my_index/_search?search_pipeline=ml_inference_pipeline_local
{
"query": {
  "term": {
    "passage_text": {
      "value": "hello"
      }
    }
  }
}

响应

响应确认处理器已在passage_embedding字段中生成了文本嵌入

{
  "took": 288,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.00009405752,
    "hits": [
      {
        "_index": "my_index",
        "_id": "1",
        "_score": 0.00009405752,
        "_source": {
          "passage_text": "hello world",
          "passage_embedding": [
            0.017304314,
            -0.021530833,
            0.050184276,
            0.08962978,
            ...]
        }
      }
    ]
  }
}

示例:外部托管的文本嵌入模型

以下示例展示了如何使用外部托管模型配置ml_inference搜索响应处理器。

步骤 1:创建管道

以下示例展示了如何为外部托管的文本嵌入模型创建搜索管道。该模型需要一个input字段并生成data字段中的结果。它将passage_text字段中的文本转换为文本嵌入,并将嵌入存储在passage_embedding字段中。function_name在处理器配置中未明确指定,因此它默认为remote,表示外部托管模型

PUT /_search/pipeline/ml_inference_pipeline
{
  "description": "Generate passage_embedding when search documents",
  "response_processors": [
    {
      "ml_inference": {
        "model_id": "<your model id>",
        "input_map": [
          {
            "input": "passage_text"
          }
        ],
        "output_map": [
          {
            "passage_embedding": "data"
          }
        ]
      }
    }
  ]
}

向外部托管模型发出预测API请求时,所有必要的字段和参数通常都包含在parameters对象中

POST /_plugins/_ml/models/cleMb4kBJ1eYAeTMFFg4/_predict
{
  "parameters": {
    "input": [
      {
        ...
      }
    ]
  }
}

为外部托管模型指定input_map时,您可以直接引用input字段,而无需提供其点路径parameters.input

"input_map": [
  {
    "input": "passage_text"
  }
]

第2步:运行管道

运行以下查询,在请求中提供管道名称

GET /my_index/_search?search_pipeline=ml_inference_pipeline_local
{
  "query": {
    "match_all": {
    }
  }
}

响应确认处理器已在passage_embedding字段中生成了文本嵌入。_source中的文档现在包含passage_textpassage_embedding字段

{
  "took": 288,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.00009405752,
    "hits": [
      {
        "_index": "my_index",
        "_id": "1",
        "_score": 0.00009405752,
        "_source": {
          "passage_text": "hello world",
          "passage_embedding": [
            0.017304314,
            -0.021530833,
            0.050184276,
            0.08962978,
            ...]
        }
      }
      }
    ]
  }
}

示例:外部托管的大语言模型

此示例演示了如何配置ml_inference搜索响应处理器以与外部托管的大语言模型 (LLM) 配合使用,并将模型的响应映射到搜索扩展对象。使用ml_inference处理器,您可以使LLM直接在响应中总结搜索结果。摘要包含在搜索响应的ext字段中,提供了对AI生成见解的无缝访问,以及原始搜索结果。

先决条件

您必须为此用例配置一个外部托管的LLM。有关外部托管模型的更多信息,请参阅连接到外部托管模型。注册LLM后,您可以使用以下请求进行测试。此请求需要提供promptcontext字段

POST /_plugins/_ml/models/KKne6JIBAs32TwoK-FFR/_predict
{
  "parameters": {
    "prompt":"\n\nHuman: You are a professional data analysist. You will always answer question: Which month had the lowest customer acquisition cost per new customer? based on the given context first. If the answer is not directly shown in the context, you will analyze the data and find the answer. If you don't know the answer, just say I don't know. Context: ${parameters.context.toString()}. \n\n Assistant:",
    "context":"Customer acquisition cost: January: $50, February: $45, March: $40. New customers: January: 500, February: 600, March: 750"
  }
}

响应在inference_results字段中包含模型输出

{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "response": """ Based on the data provided:

                        - Customer acquisition cost in January was $50 and new customers were 500. So cost per new customer was $50/500 = $0.10
                        - Customer acquisition cost in February was $45 and new customers were 600. So cost per new customer was $45/600 = $0.075
                        - Customer acquisition cost in March was $40 and new customers were 750. So cost per new customer was $40/750 = $0.053
            
                        Therefore, the month with the lowest customer acquisition cost per new customer was March, at $0.053."""
          }
        }
      ],
      "status_code": 200
    }
  ]
}

步骤 1:创建管道

为注册的模型创建搜索管道。该模型需要一个context字段作为输入。模型响应会总结review字段中的文本,并将摘要存储在搜索响应的ext.ml_inference.llm_response字段中

PUT /_search/pipeline/my_pipeline_request_review_llm
{
  "response_processors": [
    {
      "ml_inference": {
        "tag": "ml_inference",
        "description": "This processor is going to run llm",
        "model_id": "EOF6wJIBtDGAJRTD4kNg",
        "function_name": "REMOTE",
        "input_map": [
          {
            "context": "review"
          }
        ],
        "output_map": [
          {
            "ext.ml_inference.llm_response": "response"
          }
        ],
        "model_config": {
          "prompt": "\n\nHuman: You are a professional data analysist. You will always answer question: Which month had the lowest customer acquisition cost per new customer? based on the given context first. If the answer is not directly shown in the context, you will analyze the data and find the answer. If you don't know the answer, just say I don't know. Context: ${parameters.context.toString()}. \n\n Assistant:"
        },
        "ignore_missing": false,
        "ignore_failure": false
      }
    }
  ]
}

在此配置中,您提供了以下参数

  • model_id参数指定生成式AI模型的ID。
  • function_name参数设置为REMOTE,表示模型是外部托管的。
  • input_map参数将文档中的review字段映射到模型期望的context字段。
  • output_map参数指定模型响应应存储在搜索响应的ext.ml_inference.llm_response中。
  • model_config参数包含一个提示,该提示告诉模型如何处理输入并生成摘要。

第2步:索引示例文档

索引一些示例文档以测试管道

POST /_bulk
{"index":{"_index":"review_string_index","_id":"1"}}
{"review":"Customer acquisition cost: January: $50, New customers: January: 500."}
{"index":{"_index":"review_string_index","_id":"2"}}
{"review":"Customer acquisition cost: February: $45, New customers: February: 600."}
{"index":{"_index":"review_string_index","_id":"3"}}
{"review":"Customer acquisition cost: March: $40, New customers: March: 750."}

第3步:运行管道

使用管道运行搜索查询

GET /review_string_index/_search?search_pipeline=my_pipeline_request_review_llm
{
  "query": {
    "match_all": {}
  }
}

响应包含原始文档和ext.ml_inference.llm_response字段中生成的摘要

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "review_string_index",
        "_id": "1",
        "_score": 1,
        "_source": {
          "review": "Customer acquisition cost: January: $50, New customers: January: 500."
        }
      },
      {
        "_index": "review_string_index",
        "_id": "2",
        "_score": 1,
        "_source": {
          "review": "Customer acquisition cost: February: $45, New customers: February: 600."
        }
      },
      {
        "_index": "review_string_index",
        "_id": "3",
        "_score": 1,
        "_source": {
          "review": "Customer acquisition cost: March: $40, New customers: March: 750."
        }
      }
    ]
  },
  "ext": {
    "ml_inference": {
      "llm_response": """ Based on the context provided:

      - Customer acquisition cost in January was $50 and new customers were 500. So the cost per new customer was $50/500 = $0.10

      - Customer acquisition cost in February was $45 and new customers were 600. So the cost per new customer was $45/600 = $0.075

      - Customer acquisition cost in March was $40 and new customers were 750. So the cost per new customer was $40/750 = $0.053

      Therefore, the month with the lowest customer acquisition cost per new customer was March, as it had the lowest cost per customer of $0.053."""
    }
  }
}

示例:使用文本相似度模型对搜索结果进行重排

以下示例展示了如何使用文本相似度模型配置ml_inference搜索响应处理器。

先决条件

您必须为此用例配置一个外部托管的文本相似度模型。有关外部托管模型的更多信息,请参阅连接到外部托管模型。注册文本相似度模型后,您可以使用以下请求进行测试。此请求要求您在inputs字段中提供texttext_pair字段

POST /_plugins/_ml/models/Ialx65IBAs32TwoK1lXf/_predict
{
  "parameters": {
    "inputs":
    {
      "text": "I like you",
      "text_pair": "I hate you"
    }
  }
}

模型返回每个输入文档的相似度分数

{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "label": "LABEL_0",
            "score": 0.022704314440488815
          }
        }
      ],
      "status_code": 200
    }
  ]
}

第1步:索引示例文档

创建索引并添加一些示例文档

POST _bulk
{"index":{"_index":"demo-index-0","_id":"1"}}
{"diary":"I hate you"}
{"index":{"_index":"demo-index-0","_id":"2"}}
{"diary":"I love you"}
{"index":{"_index":"demo-index-0","_id":"3"}}
{"diary":"I dislike you"}

第2步:创建搜索管道

在此示例中,您将创建一个搜索管道,该管道在一对一推理模式下使用文本相似度模型,单独处理搜索结果中的每个文档。此设置允许模型为每个文档发出一个预测请求,为每个搜索命中提供特定的相关性见解。当使用input_map将搜索请求映射到查询文本时,JSON路径必须以$._request_request开头

PUT /_search/pipeline/my_rerank_pipeline
{
  "response_processors": [
    {
      "ml_inference": {
        "tag": "ml_inference",
        "description": "This processor runs ml inference during search response",
        "model_id": "Ialx65IBAs32TwoK1lXf",
        "model_input":"""{"parameters":{"inputs":{"text":"${input_map.text}","text_pair":"${input_map.text_pair}"}}}""",
        "function_name": "REMOTE",
        "input_map": [
          {
            "text": "diary",
            "text_pair":"$._request.query.term.diary.value"
          }
        ],
        "output_map": [
          {
            "rank_score": "$.score"
          }
        ],
        "full_response_path": false,
        "model_config": {},
        "ignore_missing": false,
        "ignore_failure": false,
        "one_to_one": true
        },
        "rerank": {
          "by_field": {
            "target_field": "rank_score",
            "remove_target_field": true
          }
        }
    }
  ]
}

在此配置中,您提供了以下参数

  • model_id参数指定文本相似度模型的唯一标识符。
  • function_name参数设置为REMOTE,表示模型是外部托管的。
  • input_map参数将每个文档中的diary字段映射到模型的text输入,以及将搜索查询词映射到text_pair输入。
  • output_map参数将模型的得分映射到每个文档中名为rank_score的字段。
  • model_input参数格式化模型的输入,确保其与预测API期望的结构匹配。
  • one_to_one参数设置为true,确保模型单独处理每个文档,而不是批量处理多个文档。
  • ignore_missing参数设置为false,如果文档中缺少映射字段,则处理器将失败。
  • ignore_failure参数设置为false,如果ML推理处理器遇到错误,则整个管道将失败。

rerank处理器在ML推理之后应用。它根据ML模型生成的rank_score字段重新排序文档,然后从最终结果中删除此字段。

第3步:运行管道

现在使用创建的管道执行搜索

GET /demo-index-0/_search?search_pipeline=my_rerank_pipeline
{
  "query": {
    "term": {
      "dairy": {
        "value": "today"
      }
    }
  }
}

响应包含原始文档及其重排后的分数

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 0.040183373,
    "hits": [
      {
        "_index": "demo-index-0",
        "_id": "1",
        "_score": 0.040183373,
        "_source": {
          "diary": "I hate you"
        }
      },
      {
        "_index": "demo-index-0",
        "_id": "2",
        "_score": 0.022628736,
        "_source": {
          "diary": "I love you"
        }
      },
      {
        "_index": "demo-index-0",
        "_id": "3",
        "_score": 0.0073115323,
        "_source": {
          "diary": "I dislike you"
        }
      }
    ]
  },
  "profile": {
    "shards": []
  }
}

后续步骤