Link Search Menu Expand Document Documentation Menu

Rust 客户端

OpenSearch Rust 客户端允许您将 Rust 应用程序与 OpenSearch 集群中的数据连接。有关客户端的完整 API 文档和更多示例,请参阅OpenSearch docs.rs 文档

本快速入门指南演示了如何连接到 OpenSearch、索引文档和运行查询。有关客户端源代码,请参阅 opensearch-rs 仓库

设置

如果您正在启动一个新项目,请将 opensearch crate 添加到 Cargo.toml

[dependencies]
opensearch = "1.0.0"

此外,您可能需要添加以下 serde 依赖项,它们有助于将类型序列化为 JSON 并反序列化 JSON 响应

serde = "~1"
serde_json = "~1"

Rust 客户端使用更高级别的 reqwest HTTP 客户端库进行 HTTP 请求,而 reqwest 使用 tokio 平台支持异步请求。如果您计划使用异步函数,则需要将 tokio 依赖项添加到 Cargo.toml

tokio = { version = "*", features = ["full"] }

有关完整的 Cargo.toml 文件,请参阅示例程序部分。

要使用 Rust 客户端 API,请导入您需要的模块、结构体和枚举

use opensearch::OpenSearch;

连接到 OpenSearch

要连接到默认的 OpenSearch 主机,请创建一个默认客户端对象,该对象连接到地址 https://:9200 处的 OpenSearch

let client = OpenSearch::default();

要连接到在不同地址运行的 OpenSearch 主机,请使用指定地址创建客户端

let transport = Transport::single_node("https://:9200")?;
let client = OpenSearch::new(transport);

另外,您可以通过创建 TransportBuilder 结构体并将其传递给 OpenSearch::new 来创建客户端的新实例,从而自定义 URL 并使用连接池

let url = Url::parse("https://:9200")?;
let conn_pool = SingleNodeConnectionPool::new(url);
let transport = TransportBuilder::new(conn_pool).disable_proxy().build()?;
let client = OpenSearch::new(transport);

连接到 Amazon OpenSearch 服务

以下示例演示了连接到 Amazon OpenSearch 服务

let url = Url::parse("https://...");
let service_name = "es";
let conn_pool = SingleNodeConnectionPool::new(url?);
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
let transport = TransportBuilder::new(conn_pool)
    .auth(aws_config.clone().try_into()?)
    .service_name(service_name)
    .build()?;
let client = OpenSearch::new(transport);

连接到 Amazon OpenSearch Serverless

以下示例演示了连接到 Amazon OpenSearch 无服务器服务

let url = Url::parse("https://...");
let service_name = "aoss";
let conn_pool = SingleNodeConnectionPool::new(url?);
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
let transport = TransportBuilder::new(conn_pool)
    .auth(aws_config.clone().try_into()?)
    .service_name(service_name)
    .build()?;
let client = OpenSearch::new(transport);

创建索引

要创建 OpenSearch 索引,请使用 opensearch::indices::Indices 结构体的 create 函数。您可以使用以下代码构造一个带有自定义映射的 JSON 对象

let response = client
    .indices()
    .create(IndicesCreateParts::Index("movies"))
    .body(json!({
        "mappings" : {
            "properties" : {
                "title" : { "type" : "text" }
            }
        }
    }))
    .send()
    .await?;

索引文档

您可以使用客户端的 index 函数将文档索引到 OpenSearch 中

let response = client
    .index(IndexParts::IndexId("movies", "1"))
    .body(json!({
        "id": 1,
        "title": "Moneyball",
        "director": "Bennett Miller",
        "year": "2011"
    }))
    .send()
    .await?;

执行批量操作

您可以使用客户端的 bulk 函数同时执行多个操作。首先,创建 Bulk API 调用的 JSON 正文,然后将其传递给 bulk 函数

let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);

// add the first operation and document
body.push(json!({"index": {"_id": "2"}}).into());
body.push(json!({
    "id": 2,
    "title": "Interstellar",
    "director": "Christopher Nolan",
    "year": "2014"
}).into());

// add the second operation and document
body.push(json!({"index": {"_id": "3"}}).into());
body.push(json!({
    "id": 3,
    "title": "Star Trek Beyond",
    "director": "Justin Lin",
    "year": "2015"
}).into());

let response = client
    .bulk(BulkParts::Index("movies"))
    .body(body)
    .send()
    .await?;

搜索文档

搜索文档最简单的方法是构造一个查询字符串。以下代码使用 multi_match 查询在标题和导演字段中搜索“miller”。它会提升“miller”出现在标题字段中的文档

response = client
    .search(SearchParts::Index(&["movies"]))
    .from(0)
    .size(10)
    .body(json!({
        "query": {
            "multi_match": {
                "query": "miller",
                "fields": ["title^2", "director"]
            }           
        }
    }))
    .send()
    .await?;

然后您可以将响应正文读取为 JSON,并遍历 hits 数组以读取所有 _source 文档

let response_body = response.json::<Value>().await?;
for hit in response_body["hits"]["hits"].as_array().unwrap() {
    // print the source document
    println!("{}", serde_json::to_string_pretty(&hit["_source"]).unwrap());
}

删除文档

您可以使用客户端的 delete 函数删除文档

let response = client
    .delete(DeleteParts::IndexId("movies", "2"))
    .send()
    .await?;

删除索引

您可以使用 opensearch::indices::Indices 结构体的 delete 函数删除索引

let response = client
    .indices()
    .delete(IndicesDeleteParts::Index(&["movies"]))
    .send()
    .await?;

示例程序

示例程序使用以下 Cargo.toml 文件,其中包含“设置”部分中描述的所有依赖项

[package]
name = "os_rust_project"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.net.cn/cargo/reference/manifest.html

[dependencies]
opensearch = "1.0.0"
tokio = { version = "*", features = ["full"] }
serde = "~1"
serde_json = "~1"

以下示例程序创建一个客户端,添加一个带有非默认映射的索引,插入一个文档,执行批量操作,搜索文档,删除文档,然后删除索引

use opensearch::{DeleteParts, OpenSearch, IndexParts, http::request::JsonBody, BulkParts, SearchParts};
use opensearch::{indices::{IndicesDeleteParts, IndicesCreateParts}};
use serde_json::{json, Value};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = OpenSearch::default();

    // Create an index
    let mut response = client
    .indices()
    .create(IndicesCreateParts::Index("movies"))
    .body(json!({
        "mappings" : {
            "properties" : {
                "title" : { "type" : "text" }
            }
        }
    }))
    .send()
    .await?;

    let mut successful = response.status_code().is_success();
    
    if successful {
        println!("Successfully created an index");
    }
    else {
        println!("Could not create an index");
    }

    // Index a single document
    println!("Indexing a single document...");
    response = client
    .index(IndexParts::IndexId("movies", "1"))
    .body(json!({
        "id": 1,
        "title": "Moneyball",
        "director": "Bennett Miller",
        "year": "2011"
    }))
    .send()
    .await?;

    successful = response.status_code().is_success();
    
    if successful {
        println!("Successfully indexed a document");
    }
    else {
        println!("Could not index document");
    }

    // Index multiple documents using the bulk operation

    println!("Indexing multiple documents...");

    let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);

    // add the first operation and document
    body.push(json!({"index": {"_id": "2"}}).into());
    body.push(json!({
        "id": 2,
        "title": "Interstellar",
        "director": "Christopher Nolan",
        "year": "2014"
    }).into());

    // add the second operation and document
    body.push(json!({"index": {"_id": "3"}}).into());
    body.push(json!({
        "id": 3,
        "title": "Star Trek Beyond",
        "director": "Justin Lin",
        "year": "2015"
    }).into());

    response = client
        .bulk(BulkParts::Index("movies"))
        .body(body)
        .send()
        .await?;

    let mut response_body = response.json::<Value>().await?;
    successful = response_body["errors"].as_bool().unwrap() == false;

    if successful {
        println!("Successfully performed bulk operations");
    }
    else {
        println!("Could not perform bulk operations");
    }

    // Search for a document

    println!("Searching for a document...");
    response = client
    .search(SearchParts::Index(&["movies"]))
    .from(0)
    .size(10)
    .body(json!({
        "query": {
            "multi_match": {
                "query": "miller",
                "fields": ["title^2", "director"]
            }           
        }
    }))
    .send()
    .await?;

    response_body = response.json::<Value>().await?;
    for hit in response_body["hits"]["hits"].as_array().unwrap() {
        // print the source document
        println!("{}", serde_json::to_string_pretty(&hit["_source"]).unwrap());
    }

    // Delete a document

    response = client
    .delete(DeleteParts::IndexId("movies", "2"))
    .send()
    .await?;

    successful = response.status_code().is_success();
    
    if successful {
        println!("Successfully deleted a document");
    }
    else {
        println!("Could not delete document");
    }

    // Delete the index

    response = client
    .indices()
    .delete(IndicesDeleteParts::Index(&["movies"]))
    .send()
    .await?;

    successful = response.status_code().is_success();

    if successful {
        println!("Successfully deleted the index");
    }
    else {
        println!("Could not delete the index");
    }
    
    Ok(())
}