Link Search Menu Expand Document Documentation Menu

高级 .NET 客户端 (OpenSearch.Client) 的更多高级功能

以下示例展示了 OpenSearch.Client 的更多高级功能。有关简单示例,请参阅入门指南。此示例使用以下 Student 类。

public class Student
{
    public int Id { get; init; }
    public string FirstName { get; init; }
    public string LastName { get; init; }
    public int GradYear { get; init; }
    public double Gpa { get; init; }
}

映射

OpenSearch 使用动态映射来推断已索引文档的字段类型。但是,为了更好地控制文档的架构,您可以将显式映射传递给 OpenSearch。您可以在此映射中为文档的部分或所有字段定义数据类型。

类似地,OpenSearch.Client 使用自动映射根据类的属性类型推断字段数据类型。要使用自动映射,请使用 AutoMap 的默认构造函数创建 students 索引

var createResponse = await osClient.Indices.CreateAsync("students",
    c => c.Map(m => m.AutoMap<Student>()));

如果使用自动映射,Id 和 GradYear 将被映射为整数,Gpa 被映射为双精度浮点数,FirstName 和 LastName 被映射为带有关键字子字段的文本。如果要搜索 FirstName 和 LastName 并仅允许区分大小写的完全匹配,可以通过将这些字段仅映射为关键字来抑制分析。在 Query DSL 中,您可以使用以下查询来实现此目的

PUT students
{
  "mappings" : {
    "properties" : {
      "firstName" : {
        "type" : "keyword"
      },
      "lastName" : {
        "type" : "keyword"
      }
    }
  }
}

在 OpenSearch.Client 中,您可以使用流畅的 lambda 语法将这些字段标记为关键字。

var createResponse = await osClient.Indices.CreateAsync(index,
                c => c.Map(m => m.AutoMap<Student>()
                .Properties<Student>(p => p
                .Keyword(k => k.Name(f => f.FirstName))
                .Keyword(k => k.Name(f => f.LastName)))));

设置

除了映射之外,您还可以在创建索引时指定设置,例如主分片和副本分片的数量。以下查询将主分片数量设置为 1,副本分片数量设置为 2

PUT students
{
  "mappings" : {
    "properties" : {
      "firstName" : {
        "type" : "keyword"
      },
      "lastName" : {
        "type" : "keyword"
      }
    }
  }, 
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 2
  }
}

在 OpenSearch.Client 中,上述查询的等效项如下

var createResponse = await osClient.Indices.CreateAsync(index,
                            c => c.Map(m => m.AutoMap<Student>()
                            .Properties<Student>(p => p
                            .Keyword(k => k.Name(f => f.FirstName))
                            .Keyword(k => k.Name(f => f.LastName))))
                            .Settings(s => s.NumberOfShards(1).NumberOfReplicas(2)));

使用 Bulk API 索引多个文档

除了使用 IndexIndexDocument 索引单个文档以及使用 IndexMany 索引多个文档之外,您还可以通过使用 BulkBulkAll 对文档索引获得更多控制权。单独索引文档效率低下,因为它为每个发送的文档创建一个 HTTP 请求。BulkAll 辅助程序使您无需处理重试、分块或退避请求功能。如果请求失败,它会自动重试;如果服务器关闭,它会退避;它还会控制一次 HTTP 请求中发送的文档数量。

在以下示例中,BulkAll 配置了索引名称、退避重试次数和退避时间。此外,最大并行度设置控制包含数据的并行 HTTP 请求的数量。最后,size 参数表示一次 HTTP 请求中发送的文档数量。

我们建议在生产环境中将大小设置为 100-1000 个文档。

BulkAll 接收数据流并返回一个 Observable,您可以使用它来观察后台操作。

var bulkAll = osClient.BulkAll(ReadData(), r => r
            .Index(index)
            .BackOffRetries(2)
            .BackOffTime("30s")
            .MaxDegreeOfParallelism(4)
            .Size(100));

使用布尔查询进行搜索

OpenSearch.Client 提供了完整的 OpenSearch 查询功能。除了使用匹配查询的简单搜索之外,您还可以创建更复杂的布尔查询来搜索 2022 年毕业的学生并按姓氏对他们进行排序。在下面的示例中,搜索限制为 10 个文档,并使用 scroll API 控制结果的分页。

var gradResponse = await osClient.SearchAsync<Student>(s => s
                        .Index(index)
                        .From(0)
                        .Size(10)
                        .Scroll("1m")
                        .Query(q => q
                        .Bool(b => b
                        .Filter(f => f
                        .Term(t => t.Field(fld => fld.GradYear).Value(2022)))))
                        .Sort(srt => srt.Ascending(f => f.LastName)));

响应包含 Documents 属性,其中包含来自 OpenSearch 的匹配文档。数据是 Student 类型的反序列化 JSON 对象形式,因此您可以以强类型方式访问它们的属性。所有序列化和反序列化均由 OpenSearch.Client 处理。

聚合

OpenSearch.Client 包含完整的 OpenSearch 查询功能,包括聚合。除了将搜索结果分组到存储桶中(例如,按 GPA 范围对学生进行分组)之外,您还可以计算总和或平均值等指标。以下查询计算索引中所有学生的平均 GPA。

将 Size 设置为 0 意味着 OpenSearch 只会返回聚合结果,而不是实际文档。

var aggResponse = await osClient.SearchAsync<Student>(s => s
                                .Index(index)
                                .Size(0)
                                .Aggregations(a => a
                                .Average("average gpa", 
                                            avg => avg.Field(fld => fld.Gpa))));

创建索引和索引数据的示例程序

以下程序创建一个索引,从逗号分隔文件中读取学生记录流,并将此数据索引到 OpenSearch 中。

using OpenSearch.Client;

namespace NetClientProgram;

internal class Program
{
    private const string index = "students";

    public static IOpenSearchClient osClient = new OpenSearchClient();

    public static async Task Main(string[] args)
    {
        // Check if the index with the name "students" exists
        var existResponse = await osClient.Indices.ExistsAsync(index);

        if (!existResponse.Exists)  // There is no index with this name
        {
            // Create an index "students"
            // Map FirstName and LastName as keyword
            var createResponse = await osClient.Indices.CreateAsync(index,
                c => c.Map(m => m.AutoMap<Student>()
                .Properties<Student>(p => p
                .Keyword(k => k.Name(f => f.FirstName)) 
                .Keyword(k => k.Name(f => f.LastName))))
                .Settings(s => s.NumberOfShards(1).NumberOfReplicas(1)));

            if (!createResponse.IsValid && !createResponse.Acknowledged)
            {
                throw new Exception("Create response is invalid.");
            }

            // Take a stream of data and send it to OpenSearch
            var bulkAll = osClient.BulkAll(ReadData(), r => r
            .Index(index)
            .BackOffRetries(2)
            .BackOffTime("20s")
            .MaxDegreeOfParallelism(4)
            .Size(10));

            // Wait until the data upload is complete.
            // FromMinutes specifies a timeout.
            // r is a response object that is returned as the data is indexed.
            bulkAll.Wait(TimeSpan.FromMinutes(10), r => 
                Console.WriteLine("Data chunk indexed"));
        }
    }

    // Reads student data in the form "Id,FirsName,LastName,GradYear,Gpa"
    public static IEnumerable<Student> ReadData()
    {
        var file = new StreamReader("C:\\search\\students.csv");

        string s;
        while ((s = file.ReadLine()) is not null)
        {
            yield return new Student(s);
        }
    }
}

以下程序按姓名和毕业日期搜索学生并计算平均 GPA。

using OpenSearch.Client;

namespace NetClientProgram;

internal class Program
{
    private const string index = "students";

    public static IOpenSearchClient osClient = new OpenSearchClient();

    public static async Task Main(string[] args)
    {
        await SearchByName();

        await SearchByGradDate();

        await CalculateAverageGpa();
    }

    private static async Task SearchByName()
    {
        Console.WriteLine("Searching for name......");

        var nameResponse = await osClient.SearchAsync<Student>(s => s
                                .Index(index)
                                .Query(q => q
                                .Match(m => m
                                .Field(fld => fld.FirstName)
                                .Query("Zhang"))));

        if (!nameResponse.IsValid)
        {
            throw new Exception("Aggregation query response is not valid.");
        }

        foreach (var s in nameResponse.Documents)
        {
            Console.WriteLine($"{s.Id} {s.LastName} " +
                $"{s.FirstName} {s.Gpa} {s.GradYear}");
        }
    }

    private static async Task SearchByGradDate()
    {
        Console.WriteLine("Searching for grad date......");

        // Search for all students who graduated in 2022
        var gradResponse = await osClient.SearchAsync<Student>(s => s
                                .Index(index)
                                .From(0)
                                .Size(2)
                                .Scroll("1m")
                                .Query(q => q
                                .Bool(b => b
                                .Filter(f => f
                                .Term(t => t.Field(fld => fld.GradYear).Value(2022)))))
                                .Sort(srt => srt.Ascending(f => f.LastName))
                                .Size(10));


        if (!gradResponse.IsValid)
        {
            throw new Exception("Grad date query response is not valid.");
        }

        while (gradResponse.Documents.Any())
        {
            foreach (var data in gradResponse.Documents)
            {
                Console.WriteLine($"{data.Id} {data.LastName} {data.FirstName} " +
                    $"{data.Gpa} {data.GradYear}");
            }
            gradResponse = osClient.Scroll<Student>("1m", gradResponse.ScrollId);
        }
    }

    public static async Task CalculateAverageGpa()
    {
        Console.WriteLine("Calculating average GPA......");

        // Search and aggregate
        // Size 0 means documents are not returned, only aggregation is returned
        var aggResponse = await osClient.SearchAsync<Student>(s => s
                                .Index(index)
                                .Size(0)
                                .Aggregations(a => a
                                .Average("average gpa", 
                                            avg => avg.Field(fld => fld.Gpa))));
       
        if (!aggResponse.IsValid) throw new Exception("Aggregation response not valid");

        var avg = aggResponse.Aggregations.Average("average gpa").Value;
        Console.WriteLine($"Average GPA is {avg}");
    }
}

剩余 350 字符

有问题?

想贡献?