创建或更新工作流
创建工作流会将工作流模板的内容添加到流框架系统索引。您可以提供 JSON 格式(通过指定 Content-Type: application/json
)或 YAML 格式(通过指定 Content-Type: application/yaml
)的工作流。默认情况下,工作流会进行验证以帮助识别无效配置,包括:
- 需要未安装的 OpenSearch 插件的工作流步骤。
- 依赖于这些步骤提供的先前节点输入的工作流步骤。
- 包含无效值的工作流步骤字段。
- 包含循环或重复 ID 的工作流图(节点/边)配置。
要获取工作流步骤的验证模板,请调用 获取工作流步骤 API。
您可以在工作流步骤字段的值中包含占位符表达式。例如,您可以在模板中将凭据字段指定为 openAI_key: '${{ openai_key }}'
。在配置期间,表达式将使用 ${{ <value> }}
格式替换为用户提供的值。您可以使用 配置工作流 API 或通过将此 API 的 provision
参数设置为 true
来传递实际密钥作为参数。
工作流创建后,将其 workflow_id
提供给其他 API。
POST
方法创建新工作流。PUT
方法更新现有工作流。您可以指定 update_fields
参数来更新特定字段。
如果完整工作流尚未配置,则只能更新它。
端点
POST /_plugins/_flow_framework/workflow
PUT /_plugins/_flow_framework/workflow/<workflow_id>
路径参数
下表列出了可用的路径参数。
参数 | 数据类型 | 描述 |
---|---|---|
workflow_id | 字符串 | 要更新的工作流的 ID。PUT 方法必需。 |
查询参数
工作流通常在单独的步骤中创建和配置。但是,在您彻底测试工作流后,可以通过包含 provision
查询参数来合并创建和配置步骤:
POST /_plugins/_flow_framework/workflow?provision=true
当设置为 true
时,配置工作流 API 将在创建后立即执行。
默认情况下,工作流在创建时会进行验证,以确保语法有效且图中不包含循环。此行为可通过 validation
查询参数控制。如果 validation
设置为 all
,OpenSearch 将执行完整的模板验证。任何其他 validation
参数值将抑制验证,允许保存不完整/进行中的模板。要禁用模板验证,请将 validation
设置为 none
。
POST /_plugins/_flow_framework/workflow?validation=none
在未配置的工作流中,您可以更新 workflows
字段之外的其他字段。例如,您可以按如下方式更新 name
和 description
字段:
PUT /_plugins/_flow_framework/workflow/<workflow_id>?update_fields=true
{
"name": "new-template-name",
"description": "A new description for the existing template"
}
您不能同时指定 provision
和 update_fields
参数。
如果工作流已配置,您可以更新并重新配置完整模板:
PUT /_plugins/_flow_framework/workflow/<workflow_id>?reprovision=true
{
<updated complete template>
}
您可以向工作流添加新步骤,但不能删除它们。目前只有索引设置、搜索管道和摄取管道步骤可以更新。
要控制请求等待配置和重新配置过程完成的时间,请使用 wait_for_completion_timeout
参数:
POST /_plugins/_flow_framework/workflow/?provision=true&wait_for_completion_timeout=2s
PUT /_plugins/_flow_framework/workflow/<workflow_id>/?reprovision=true&wait_for_completion_timeout=2s
如果操作未在指定时间内完成,响应将返回当前工作流状态,而执行将异步继续。
wait_for_completion_timeout
参数仅在 provision
或 reprovision
设置为 true
时使用。
例如,以下请求配置一个工作流并等待最多 2 秒完成:您可以使用 工作流模板 创建和配置工作流,如下所示:
POST /_plugins/_flow_framework/workflow?use_case=<use_case>&provision=true
{
"create_connector.credential.key" : "<YOUR API KEY>"
}
下表列出了可用的查询参数。所有查询参数都是可选的。仅当 provision
参数设置为 true
时才允许用户提供的参数。
参数 | 数据类型 | 描述 |
---|---|---|
provision | 布尔型 | 是否将工作流配置为请求的一部分。默认为 false 。 |
update_fields | 布尔型 | 是否仅更新请求正文中包含的字段。默认为 false 。 |
reprovision | 布尔型 | 如果已配置,是否重新配置整个模板。请求正文中必须提供完整的模板。默认为 false 。 |
validation | 字符串 | 是否验证工作流。有效值为 all (验证模板)和 none (不验证模板)。默认为 all 。 |
use_case | 字符串 | 创建工作流时使用的 工作流模板 的名称。 |
wait_for_completion_timeout | 时间值 | 指定同步配置或重新配置的最大等待时间。如果超时,请求将返回当前工作流状态,而执行将异步继续。 |
用户提供的替换表达式 | 字符串 | 与模板中的替换表达式匹配的参数。仅当 provision 设置为 true 时才允许。可选。如果 provision 设置为 false ,您可以在 配置工作流 API 查询参数 中传递这些参数。 |
请求正文字段
下表列出了可用的请求字段。
字段 | 数据类型 | 必需/可选 | 描述 |
---|---|---|---|
名称 | 字符串 | 必需 | 工作流的名称。 |
description | 字符串 | 可选 | 工作流的描述。 |
use_case | 字符串 | 可选 | 用户提供的用例,可与 搜索工作流 API 一起使用以查找相关工作流。您可以使用此字段指定自定义值。这与 use_case 查询参数不同。 |
version | 对象 | 可选 | 包含两个字段的键值映射:template ,用于标识模板版本;compatibility ,用于标识所需 OpenSearch 最低版本列表。 |
workflows | 对象 | 可选 | 工作流映射。目前,仅支持 provision 键。工作流键的值是一个键值映射,包含 user_params 字段以及 nodes 和 edges 列表。 |
示例请求:注册和部署外部托管模型 (YAML)
要提供 YAML 格式的模板,请在请求头中指定 Content-Type: application/yaml
。
curl -XPOST "https://:9200/_plugins/_flow_framework/workflow" -H 'Content-Type: application/yaml'
YAML 模板允许注释。
以下是用于注册和部署外部托管模型的 YAML 模板示例:
# This name is required
name: createconnector-registerremotemodel-deploymodel
# Other fields are optional but useful
description: This template creates a connector to a remote model, registers it, and
deploys that model
# Other templates with a similar use case can be searched
use_case: REMOTE_MODEL_DEPLOYMENT
version:
# Templates may be versioned by their authors
template: 1.0.0
# Compatibility with OpenSearch 2.12.0 and higher and 3.0.0 and higher
compatibility:
- 2.12.0
- 3.0.0
# One or more workflows can be included, presently only provision is supported
workflows:
provision:
# These nodes are the workflow steps corresponding to ML Commons APIs
nodes:
# This ID must be unique to this workflow
- id: create_connector_1
# There may be multiple steps with the same type
type: create_connector
# These inputs match the Create Connector API body
user_inputs:
name: OpenAI Chat Connector
description: The connector to public OpenAI model service for GPT 3.5
version: '1'
protocol: http
parameters:
endpoint: api.openai.com
model: gpt-3.5-turbo
credential:
openAI_key: '12345'
actions:
- action_type: predict
method: POST
url: https://${parameters.endpoint}/v1/chat/completions
# This ID must be unique to this workflow
- id: register_model_2
type: register_remote_model
# This step needs the connector_id produced as an output of the previous step
previous_node_inputs:
create_connector_1: connector_id
# These inputs match the Register Model API body
user_inputs:
name: openAI-gpt-3.5-turbo
function_name: remote
description: test model
# This ID must be unique to this workflow
- id: deploy_model_3
type: deploy_model
# This step needs the model_id produced as an output of the previous step
previous_node_inputs:
register_model_2: model_id
# Since the nodes include previous_node_inputs these are optional to define
# They will be added automatically and included in the stored template
# Additional edges may also be added here if required for sequencing
edges:
- source: create_connector_1
dest: register_model_2
- source: register_model_2
dest: deploy_model_3
示例请求:注册和部署远程模型 (JSON)
要提供 JSON 格式的模板,请在请求头中指定 Content-Type: application/json
。
curl -XPOST "https://:9200/_plugins/_flow_framework/workflow" -H 'Content-Type: application/json'
以下 JSON 模板等效于上一节中提供的 YAML 模板:
{
"name": "createconnector-registerremotemodel-deploymodel",
"description": "This template creates a connector to a remote model, registers it, and deploys that model",
"use_case": "REMOTE_MODEL_DEPLOYMENT",
"version": {
"template": "1.0.0",
"compatibility": [
"2.12.0",
"3.0.0"
]
},
"workflows": {
"provision": {
"nodes": [
{
"id": "create_connector_1",
"type": "create_connector",
"user_inputs": {
"name": "OpenAI Chat Connector",
"description": "The connector to public OpenAI model service for GPT 3.5",
"version": "1",
"protocol": "http",
"parameters": {
"endpoint": "api.openai.com",
"model": "gpt-3.5-turbo"
},
"credential": {
"openAI_key": "12345"
},
"actions": [
{
"action_type": "predict",
"method": "POST",
"url": "https://${parameters.endpoint}/v1/chat/completions"
}
]
}
},
{
"id": "register_model_2",
"type": "register_remote_model",
"previous_node_inputs": {
"create_connector_1": "connector_id"
},
"user_inputs": {
"name": "openAI-gpt-3.5-turbo",
"function_name": "remote",
"description": "test model"
}
},
{
"id": "deploy_model_3",
"type": "deploy_model",
"previous_node_inputs": {
"register_model_2": "model_id"
}
}
],
"edges": [
{
"source": "create_connector_1",
"dest": "register_model_2"
},
{
"source": "register_model_2",
"dest": "deploy_model_3"
}
]
}
}
}
示例响应
OpenSearch 将返回 workflow_id
。
{
"workflow_id" : "8xL8bowB8y25Tqfenm50"
}
创建工作流后,您可以使用 workflow_id
与其他工作流 API 配合使用。
启用了 wait_for_completion_timeout 的示例响应
{
"workflow_id": "K13IR5QBEpCfUu_-AQdU",
"state": "COMPLETED",
"resources_created": [
{
"workflow_step_name": "create_connector",
"workflow_step_id": "create_connector_1",
"resource_id": "LF3IR5QBEpCfUu_-Awd_",
"resource_type": "connector_id"
},
{
"workflow_step_id": "register_model_2",
"workflow_step_name": "register_remote_model",
"resource_id": "L13IR5QBEpCfUu_-BQdI",
"resource_type": "model_id"
},
{
"workflow_step_name": "deploy_model",
"workflow_step_id": "deploy_model_3",
"resource_id": "L13IR5QBEpCfUu_-BQdI",
"resource_type": "model_id"
}
]
}