Rust 客户端
OpenSearch Rust 客户端允许您将 Rust 应用程序与 OpenSearch 集群中的数据连接。有关客户端的完整 API 文档和更多示例,请参阅OpenSearch docs.rs 文档。
本快速入门指南演示了如何连接到 OpenSearch、索引文档和运行查询。有关客户端源代码,请参阅 opensearch-rs 仓库。
设置
如果您正在启动一个新项目,请将 opensearch
crate 添加到 Cargo.toml
[dependencies]
opensearch = "1.0.0"
此外,您可能需要添加以下 serde
依赖项,它们有助于将类型序列化为 JSON 并反序列化 JSON 响应
serde = "~1"
serde_json = "~1"
Rust 客户端使用更高级别的 reqwest
HTTP 客户端库进行 HTTP 请求,而 reqwest
使用 tokio
平台支持异步请求。如果您计划使用异步函数,则需要将 tokio
依赖项添加到 Cargo.toml
tokio = { version = "*", features = ["full"] }
有关完整的 Cargo.toml 文件,请参阅示例程序部分。
要使用 Rust 客户端 API,请导入您需要的模块、结构体和枚举
use opensearch::OpenSearch;
连接到 OpenSearch
要连接到默认的 OpenSearch 主机,请创建一个默认客户端对象,该对象连接到地址 https://:9200
处的 OpenSearch
let client = OpenSearch::default();
要连接到在不同地址运行的 OpenSearch 主机,请使用指定地址创建客户端
let transport = Transport::single_node("https://:9200")?;
let client = OpenSearch::new(transport);
另外,您可以通过创建 TransportBuilder
结构体并将其传递给 OpenSearch::new
来创建客户端的新实例,从而自定义 URL 并使用连接池
let url = Url::parse("https://:9200")?;
let conn_pool = SingleNodeConnectionPool::new(url);
let transport = TransportBuilder::new(conn_pool).disable_proxy().build()?;
let client = OpenSearch::new(transport);
连接到 Amazon OpenSearch 服务
以下示例演示了连接到 Amazon OpenSearch 服务
let url = Url::parse("https://...");
let service_name = "es";
let conn_pool = SingleNodeConnectionPool::new(url?);
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
let transport = TransportBuilder::new(conn_pool)
.auth(aws_config.clone().try_into()?)
.service_name(service_name)
.build()?;
let client = OpenSearch::new(transport);
连接到 Amazon OpenSearch Serverless
以下示例演示了连接到 Amazon OpenSearch 无服务器服务
let url = Url::parse("https://...");
let service_name = "aoss";
let conn_pool = SingleNodeConnectionPool::new(url?);
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
let transport = TransportBuilder::new(conn_pool)
.auth(aws_config.clone().try_into()?)
.service_name(service_name)
.build()?;
let client = OpenSearch::new(transport);
创建索引
要创建 OpenSearch 索引,请使用 opensearch::indices::Indices
结构体的 create
函数。您可以使用以下代码构造一个带有自定义映射的 JSON 对象
let response = client
.indices()
.create(IndicesCreateParts::Index("movies"))
.body(json!({
"mappings" : {
"properties" : {
"title" : { "type" : "text" }
}
}
}))
.send()
.await?;
索引文档
您可以使用客户端的 index
函数将文档索引到 OpenSearch 中
let response = client
.index(IndexParts::IndexId("movies", "1"))
.body(json!({
"id": 1,
"title": "Moneyball",
"director": "Bennett Miller",
"year": "2011"
}))
.send()
.await?;
执行批量操作
您可以使用客户端的 bulk
函数同时执行多个操作。首先,创建 Bulk API 调用的 JSON 正文,然后将其传递给 bulk
函数
let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
// add the first operation and document
body.push(json!({"index": {"_id": "2"}}).into());
body.push(json!({
"id": 2,
"title": "Interstellar",
"director": "Christopher Nolan",
"year": "2014"
}).into());
// add the second operation and document
body.push(json!({"index": {"_id": "3"}}).into());
body.push(json!({
"id": 3,
"title": "Star Trek Beyond",
"director": "Justin Lin",
"year": "2015"
}).into());
let response = client
.bulk(BulkParts::Index("movies"))
.body(body)
.send()
.await?;
搜索文档
搜索文档最简单的方法是构造一个查询字符串。以下代码使用 multi_match
查询在标题和导演字段中搜索“miller”。它会提升“miller”出现在标题字段中的文档
response = client
.search(SearchParts::Index(&["movies"]))
.from(0)
.size(10)
.body(json!({
"query": {
"multi_match": {
"query": "miller",
"fields": ["title^2", "director"]
}
}
}))
.send()
.await?;
然后您可以将响应正文读取为 JSON,并遍历 hits
数组以读取所有 _source
文档
let response_body = response.json::<Value>().await?;
for hit in response_body["hits"]["hits"].as_array().unwrap() {
// print the source document
println!("{}", serde_json::to_string_pretty(&hit["_source"]).unwrap());
}
删除文档
您可以使用客户端的 delete
函数删除文档
let response = client
.delete(DeleteParts::IndexId("movies", "2"))
.send()
.await?;
删除索引
您可以使用 opensearch::indices::Indices
结构体的 delete
函数删除索引
let response = client
.indices()
.delete(IndicesDeleteParts::Index(&["movies"]))
.send()
.await?;
示例程序
示例程序使用以下 Cargo.toml 文件,其中包含“设置”部分中描述的所有依赖项
[package]
name = "os_rust_project"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.net.cn/cargo/reference/manifest.html
[dependencies]
opensearch = "1.0.0"
tokio = { version = "*", features = ["full"] }
serde = "~1"
serde_json = "~1"
以下示例程序创建一个客户端,添加一个带有非默认映射的索引,插入一个文档,执行批量操作,搜索文档,删除文档,然后删除索引
use opensearch::{DeleteParts, OpenSearch, IndexParts, http::request::JsonBody, BulkParts, SearchParts};
use opensearch::{indices::{IndicesDeleteParts, IndicesCreateParts}};
use serde_json::{json, Value};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = OpenSearch::default();
// Create an index
let mut response = client
.indices()
.create(IndicesCreateParts::Index("movies"))
.body(json!({
"mappings" : {
"properties" : {
"title" : { "type" : "text" }
}
}
}))
.send()
.await?;
let mut successful = response.status_code().is_success();
if successful {
println!("Successfully created an index");
}
else {
println!("Could not create an index");
}
// Index a single document
println!("Indexing a single document...");
response = client
.index(IndexParts::IndexId("movies", "1"))
.body(json!({
"id": 1,
"title": "Moneyball",
"director": "Bennett Miller",
"year": "2011"
}))
.send()
.await?;
successful = response.status_code().is_success();
if successful {
println!("Successfully indexed a document");
}
else {
println!("Could not index document");
}
// Index multiple documents using the bulk operation
println!("Indexing multiple documents...");
let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
// add the first operation and document
body.push(json!({"index": {"_id": "2"}}).into());
body.push(json!({
"id": 2,
"title": "Interstellar",
"director": "Christopher Nolan",
"year": "2014"
}).into());
// add the second operation and document
body.push(json!({"index": {"_id": "3"}}).into());
body.push(json!({
"id": 3,
"title": "Star Trek Beyond",
"director": "Justin Lin",
"year": "2015"
}).into());
response = client
.bulk(BulkParts::Index("movies"))
.body(body)
.send()
.await?;
let mut response_body = response.json::<Value>().await?;
successful = response_body["errors"].as_bool().unwrap() == false;
if successful {
println!("Successfully performed bulk operations");
}
else {
println!("Could not perform bulk operations");
}
// Search for a document
println!("Searching for a document...");
response = client
.search(SearchParts::Index(&["movies"]))
.from(0)
.size(10)
.body(json!({
"query": {
"multi_match": {
"query": "miller",
"fields": ["title^2", "director"]
}
}
}))
.send()
.await?;
response_body = response.json::<Value>().await?;
for hit in response_body["hits"]["hits"].as_array().unwrap() {
// print the source document
println!("{}", serde_json::to_string_pretty(&hit["_source"]).unwrap());
}
// Delete a document
response = client
.delete(DeleteParts::IndexId("movies", "2"))
.send()
.await?;
successful = response.status_code().is_success();
if successful {
println!("Successfully deleted a document");
}
else {
println!("Could not delete document");
}
// Delete the index
response = client
.indices()
.delete(IndicesDeleteParts::Index(&["movies"]))
.send()
.await?;
successful = response.status_code().is_success();
if successful {
println!("Successfully deleted the index");
}
else {
println!("Could not delete the index");
}
Ok(())
}