Link Search Menu Expand Document Documentation Menu

异步批量摄取

已弃用 3.0

此功能已弃用。如需类似功能,请使用 OpenSearch Data Prepper。如果您希望此功能恢复,请在 ML Commons 存储库中创建一个议题

批量摄取配置一个摄取管道,该管道逐个处理文档。对于每个文档,批量摄取都会调用一个外部托管模型,从文档文本生成文本嵌入,然后将文档(包括文本和嵌入)摄取到 OpenSearch 索引中。

作为此实时过程的替代方案,*异步*批量摄取会摄取在 OpenSearch 外部生成并存储在远程文件服务器(例如 Amazon Simple Storage Service (Amazon S3) 或 OpenAI)上的文档及其嵌入。异步摄取会返回一个任务 ID,并异步运行以将数据离线摄取到您的 k-NN 集群中,用于神经搜索。您可以将异步批量摄取与 批量预测 API 结合使用,以异步执行推理。批量预测操作接受一个包含文档的输入文件,并调用一个外部托管模型,为这些文档在输出文件中生成嵌入。然后,您可以使用异步批量摄取将包含文档的输入文件和包含其嵌入的输出文件摄取到 OpenSearch 索引中。

从 OpenSearch 2.17 开始,异步批量摄取 API 支持 Amazon SageMaker、Amazon Bedrock 和 OpenAI。

先决条件

在使用异步批量摄取之前,您必须使用自选模型生成文本嵌入,并将输出存储在文件服务器(例如 Amazon S3)上。例如,您可以将批量 API 调用 Amazon SageMaker 文本嵌入模型的输出存储到 Amazon S3 输出路径为 s3://offlinebatch/output/sagemaker_batch.json.out 的文件中。输出为 JSONL 格式,每行代表一个文本嵌入结果。文件内容格式如下

{"SageMakerOutput":[[-0.017166402,0.055771016,...],[-0.06422759,-0.004301484,...],"content":["this is chapter 1","harry potter"],"id":1}
{"SageMakerOutput":[[-0.017455402,0.023771016,...],[-0.02322759,-0.009101284,...],"content":["this is chapter 2","draco malfoy"],"id":1}
...

从单个文件摄取数据

首先,创建一个 k-NN 索引,您将数据摄取到其中。k-NN 索引中的字段表示源数据文件中的数据结构。

在此示例中,源文件包含带有标题和章节的文档,以及它们相应的嵌入。因此,您将创建一个包含字段 idchapter_embeddingchaptertitle_embeddingtitle 的 k-NN 索引。

PUT /my-nlp-index
{
  "settings": {
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "text"
      },
      "chapter_embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "engine": "faiss",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "chapter": {
        "type": "text"
      },
      "title_embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "engine": "faiss",
          "space_type": "cosinesimil",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "title": {
        "type": "text"
      }
    }
  }
}

当使用 S3 文件作为异步批量摄取的源时,您必须将源文件中的字段映射到索引中的字段,以指示每条数据将被摄取到哪个索引中。如果未为某个字段提供 JSON 路径,则该字段在 k-NN 索引中将被设置为 null

field_map 中,指示源文件中每个字段的数据位置。您还可以通过将其 JSON 路径添加到 ingest_fields 数组中,指定无需对源文件进行任何更改即可直接摄取到索引中的字段。例如,在以下异步批量摄取请求中,源文件中 JSON 路径为 $.id 的元素将直接摄取到索引的 id 字段中。要从 Amazon S3 文件摄取此数据,请向您的 OpenSearch 端点发送以下请求

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index",
  "field_map": {
    "chapter": "$.content[0]",
    "title": "$.content[1]",
    "chapter_embedding": "$.SageMakerOutput[0]",
    "title_embedding": "$.SageMakerOutput[1]",
    "_id": "$.id"
  },
  "ingest_fields": ["$.id"],
  "credential": {
    "region": "us-east-1",
    "access_key": "<your access key>",
    "secret_key": "<your secret key>",
    "session_token": "<your session token>"
  },
  "data_source": {
    "type": "s3",
    "source": ["s3://offlinebatch/output/sagemaker_batch.json.out"]
  }
}

响应包含摄取任务的任务 ID

{
  "task_id": "cbsPlpEBMHcagzGbOQOx",
  "task_type": "BATCH_INGEST",
  "status": "CREATED"
}

要检查操作状态,请将任务 ID 提供给 任务 API。摄取完成后,任务 state 将变为 COMPLETED

从多个文件摄取数据

您还可以通过在 source 中指定文件位置来从多个文件摄取数据。以下示例从三个 OpenAI 文件中摄取数据。

OpenAI 批量 API 输入文件的格式如下

{"custom_id": "request-1", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-ada-002", "input": [ "What is the meaning of life?", "The food was delicious and the waiter..."]}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-ada-002", "input": [ "What is the meaning of work?", "The travel was fantastic and the view..."]}}
{"custom_id": "request-3", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-ada-002", "input": [ "What is the meaning of friend?", "The old friend was far away and the time..."]}}
...

OpenAI 批量 API 输出文件的格式如下

{"id": "batch_req_ITKQn29igorXCAGp6wzYs5IS", "custom_id": "request-1", "response": {"status_code": 200, "request_id": "10845755592510080d13054c3776aef4", "body": {"object": "list", "data": [{"object": "embedding", "index": 0, "embedding": [0.0044326545, ... ...]}, {"object": "embedding", "index": 1, "embedding": [0.002297497, ... ... ]}], "model": "text-embedding-ada-002", "usage": {"prompt_tokens": 15, "total_tokens": 15}}}, "error": null}
...

如果您已在 OpenAI 中运行批量 API 进行文本嵌入,并希望将模型输入和输出文件以及一些元数据摄取到您的索引中,请发送以下异步摄取请求。确保使用 source[file-index] 来识别请求正文中源数组中的文件位置。例如,source[0] 指的是 data_source.source 数组中的第一个文件。

以下请求将七个字段摄取到您的索引中:五个在 field_map 部分中指定,两个在 ingest_fields 中指定。格式遵循 sourcefile.jsonPath 模式,指示每个文件的 JSON 路径。在 field_map 中,$.body.input[0] 用作 JSON 路径,用于将数据从 source 数组中的第二个文件摄取到 question 字段中。ingest_fields 数组列出了所有将直接摄取到您的索引中的 source 文件中的元素

POST /_plugins/_ml/_batch_ingestion
{
  "index_name": "my-nlp-index-openai",
  "field_map": {
    "question": "source[1].$.body.input[0]",
    "answer": "source[1].$.body.input[1]",
    "question_embedding":"source[0].$.response.body.data[0].embedding",
    "answer_embedding":"source[0].$.response.body.data[1].embedding",
    "_id": ["source[0].$.custom_id", "source[1].$.custom_id"]
  },
  "ingest_fields": ["source[2].$.custom_field1", "source[2].$.custom_field2"],
  "credential": {
    "openAI_key": "<you openAI key>"
  },
  "data_source": {
    "type": "openAI",
    "source": ["file-<your output file id>", "file-<your input file id>", "file-<your other file>"]
  }
}

在请求中,请务必在 field_map 中定义 _id 字段。这是将来自三个单独文件中的每个数据条目进行映射所必需的。

响应包含摄取任务的任务 ID

{
  "task_id": "cbsPlpEBMHcagzGbOQOx",
  "task_type": "BATCH_INGEST",
  "status": "CREATED"
}

要检查操作状态,请将任务 ID 提供给 任务 API。摄取完成后,任务 state 将变为 COMPLETED

有关请求字段说明,请参阅 异步批量摄取 API

剩余 350 字符

有问题?

想做贡献?