高级 .NET 客户端 (OpenSearch.Client) 的更多高级功能
以下示例展示了 OpenSearch.Client 的更多高级功能。有关简单示例,请参阅入门指南。此示例使用以下 Student 类。
public class Student
{
public int Id { get; init; }
public string FirstName { get; init; }
public string LastName { get; init; }
public int GradYear { get; init; }
public double Gpa { get; init; }
}
映射
OpenSearch 使用动态映射来推断已索引文档的字段类型。但是,为了更好地控制文档的架构,您可以将显式映射传递给 OpenSearch。您可以在此映射中为文档的部分或所有字段定义数据类型。
类似地,OpenSearch.Client 使用自动映射根据类的属性类型推断字段数据类型。要使用自动映射,请使用 AutoMap 的默认构造函数创建 students
索引
var createResponse = await osClient.Indices.CreateAsync("students",
c => c.Map(m => m.AutoMap<Student>()));
如果使用自动映射,Id 和 GradYear 将被映射为整数,Gpa 被映射为双精度浮点数,FirstName 和 LastName 被映射为带有关键字子字段的文本。如果要搜索 FirstName 和 LastName 并仅允许区分大小写的完全匹配,可以通过将这些字段仅映射为关键字来抑制分析。在 Query DSL 中,您可以使用以下查询来实现此目的
PUT students
{
"mappings" : {
"properties" : {
"firstName" : {
"type" : "keyword"
},
"lastName" : {
"type" : "keyword"
}
}
}
}
在 OpenSearch.Client 中,您可以使用流畅的 lambda 语法将这些字段标记为关键字。
var createResponse = await osClient.Indices.CreateAsync(index,
c => c.Map(m => m.AutoMap<Student>()
.Properties<Student>(p => p
.Keyword(k => k.Name(f => f.FirstName))
.Keyword(k => k.Name(f => f.LastName)))));
设置
除了映射之外,您还可以在创建索引时指定设置,例如主分片和副本分片的数量。以下查询将主分片数量设置为 1,副本分片数量设置为 2
PUT students
{
"mappings" : {
"properties" : {
"firstName" : {
"type" : "keyword"
},
"lastName" : {
"type" : "keyword"
}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 2
}
}
在 OpenSearch.Client 中,上述查询的等效项如下
var createResponse = await osClient.Indices.CreateAsync(index,
c => c.Map(m => m.AutoMap<Student>()
.Properties<Student>(p => p
.Keyword(k => k.Name(f => f.FirstName))
.Keyword(k => k.Name(f => f.LastName))))
.Settings(s => s.NumberOfShards(1).NumberOfReplicas(2)));
使用 Bulk API 索引多个文档
除了使用 Index
和 IndexDocument
索引单个文档以及使用 IndexMany
索引多个文档之外,您还可以通过使用 Bulk
或 BulkAll
对文档索引获得更多控制权。单独索引文档效率低下,因为它为每个发送的文档创建一个 HTTP 请求。BulkAll 辅助程序使您无需处理重试、分块或退避请求功能。如果请求失败,它会自动重试;如果服务器关闭,它会退避;它还会控制一次 HTTP 请求中发送的文档数量。
在以下示例中,BulkAll
配置了索引名称、退避重试次数和退避时间。此外,最大并行度设置控制包含数据的并行 HTTP 请求的数量。最后,size 参数表示一次 HTTP 请求中发送的文档数量。
我们建议在生产环境中将大小设置为 100-1000 个文档。
BulkAll
接收数据流并返回一个 Observable,您可以使用它来观察后台操作。
var bulkAll = osClient.BulkAll(ReadData(), r => r
.Index(index)
.BackOffRetries(2)
.BackOffTime("30s")
.MaxDegreeOfParallelism(4)
.Size(100));
使用布尔查询进行搜索
OpenSearch.Client 提供了完整的 OpenSearch 查询功能。除了使用匹配查询的简单搜索之外,您还可以创建更复杂的布尔查询来搜索 2022 年毕业的学生并按姓氏对他们进行排序。在下面的示例中,搜索限制为 10 个文档,并使用 scroll API 控制结果的分页。
var gradResponse = await osClient.SearchAsync<Student>(s => s
.Index(index)
.From(0)
.Size(10)
.Scroll("1m")
.Query(q => q
.Bool(b => b
.Filter(f => f
.Term(t => t.Field(fld => fld.GradYear).Value(2022)))))
.Sort(srt => srt.Ascending(f => f.LastName)));
响应包含 Documents 属性,其中包含来自 OpenSearch 的匹配文档。数据是 Student 类型的反序列化 JSON 对象形式,因此您可以以强类型方式访问它们的属性。所有序列化和反序列化均由 OpenSearch.Client 处理。
聚合
OpenSearch.Client 包含完整的 OpenSearch 查询功能,包括聚合。除了将搜索结果分组到存储桶中(例如,按 GPA 范围对学生进行分组)之外,您还可以计算总和或平均值等指标。以下查询计算索引中所有学生的平均 GPA。
将 Size 设置为 0 意味着 OpenSearch 只会返回聚合结果,而不是实际文档。
var aggResponse = await osClient.SearchAsync<Student>(s => s
.Index(index)
.Size(0)
.Aggregations(a => a
.Average("average gpa",
avg => avg.Field(fld => fld.Gpa))));
创建索引和索引数据的示例程序
以下程序创建一个索引,从逗号分隔文件中读取学生记录流,并将此数据索引到 OpenSearch 中。
using OpenSearch.Client;
namespace NetClientProgram;
internal class Program
{
private const string index = "students";
public static IOpenSearchClient osClient = new OpenSearchClient();
public static async Task Main(string[] args)
{
// Check if the index with the name "students" exists
var existResponse = await osClient.Indices.ExistsAsync(index);
if (!existResponse.Exists) // There is no index with this name
{
// Create an index "students"
// Map FirstName and LastName as keyword
var createResponse = await osClient.Indices.CreateAsync(index,
c => c.Map(m => m.AutoMap<Student>()
.Properties<Student>(p => p
.Keyword(k => k.Name(f => f.FirstName))
.Keyword(k => k.Name(f => f.LastName))))
.Settings(s => s.NumberOfShards(1).NumberOfReplicas(1)));
if (!createResponse.IsValid && !createResponse.Acknowledged)
{
throw new Exception("Create response is invalid.");
}
// Take a stream of data and send it to OpenSearch
var bulkAll = osClient.BulkAll(ReadData(), r => r
.Index(index)
.BackOffRetries(2)
.BackOffTime("20s")
.MaxDegreeOfParallelism(4)
.Size(10));
// Wait until the data upload is complete.
// FromMinutes specifies a timeout.
// r is a response object that is returned as the data is indexed.
bulkAll.Wait(TimeSpan.FromMinutes(10), r =>
Console.WriteLine("Data chunk indexed"));
}
}
// Reads student data in the form "Id,FirsName,LastName,GradYear,Gpa"
public static IEnumerable<Student> ReadData()
{
var file = new StreamReader("C:\\search\\students.csv");
string s;
while ((s = file.ReadLine()) is not null)
{
yield return new Student(s);
}
}
}
搜索示例程序
以下程序按姓名和毕业日期搜索学生并计算平均 GPA。
using OpenSearch.Client;
namespace NetClientProgram;
internal class Program
{
private const string index = "students";
public static IOpenSearchClient osClient = new OpenSearchClient();
public static async Task Main(string[] args)
{
await SearchByName();
await SearchByGradDate();
await CalculateAverageGpa();
}
private static async Task SearchByName()
{
Console.WriteLine("Searching for name......");
var nameResponse = await osClient.SearchAsync<Student>(s => s
.Index(index)
.Query(q => q
.Match(m => m
.Field(fld => fld.FirstName)
.Query("Zhang"))));
if (!nameResponse.IsValid)
{
throw new Exception("Aggregation query response is not valid.");
}
foreach (var s in nameResponse.Documents)
{
Console.WriteLine($"{s.Id} {s.LastName} " +
$"{s.FirstName} {s.Gpa} {s.GradYear}");
}
}
private static async Task SearchByGradDate()
{
Console.WriteLine("Searching for grad date......");
// Search for all students who graduated in 2022
var gradResponse = await osClient.SearchAsync<Student>(s => s
.Index(index)
.From(0)
.Size(2)
.Scroll("1m")
.Query(q => q
.Bool(b => b
.Filter(f => f
.Term(t => t.Field(fld => fld.GradYear).Value(2022)))))
.Sort(srt => srt.Ascending(f => f.LastName))
.Size(10));
if (!gradResponse.IsValid)
{
throw new Exception("Grad date query response is not valid.");
}
while (gradResponse.Documents.Any())
{
foreach (var data in gradResponse.Documents)
{
Console.WriteLine($"{data.Id} {data.LastName} {data.FirstName} " +
$"{data.Gpa} {data.GradYear}");
}
gradResponse = osClient.Scroll<Student>("1m", gradResponse.ScrollId);
}
}
public static async Task CalculateAverageGpa()
{
Console.WriteLine("Calculating average GPA......");
// Search and aggregate
// Size 0 means documents are not returned, only aggregation is returned
var aggResponse = await osClient.SearchAsync<Student>(s => s
.Index(index)
.Size(0)
.Aggregations(a => a
.Average("average gpa",
avg => avg.Field(fld => fld.Gpa))));
if (!aggResponse.IsValid) throw new Exception("Aggregation response not valid");
var avg = aggResponse.Aggregations.Average("average gpa").Value;
Console.WriteLine($"Average GPA is {avg}");
}
}