Link Search Menu Expand Document Documentation Menu

Ruby 客户端

OpenSearch Ruby 客户端允许您通过 Ruby 方法而不是 HTTP 方法和原始 JSON 与 OpenSearch 集群进行交互。有关客户端的完整 API 文档和更多示例,请参阅 opensearch-transportopensearch-apiopensearch-dslopensearch-ruby gem 文档。

本入门指南演示了如何连接到 OpenSearch、索引文档和运行查询。有关客户端源代码,请参阅 opensearch-ruby 存储库

安装 Ruby 客户端

要安装 Ruby 客户端的 Ruby gem,请运行以下命令

gem install opensearch-ruby

要使用客户端,请将其作为模块导入

require 'opensearch'

连接到 OpenSearch

要连接到默认 OpenSearch 主机,请创建一个客户端对象,并在构造函数中传入默认主机地址

client = OpenSearch::Client.new(host: 'https://:9200')

以下示例创建了一个客户端对象,其中包含自定义 URL,并将 log 选项设置为 true。它将 retry_on_failure 参数设置为重试失败请求五次而不是默认的三次。最后,它通过将 request_timeout 参数设置为 120 秒来增加超时。然后,它返回基本的集群健康信息

client = OpenSearch::Client.new(
    url: "https://:9200",
    retry_on_failure: 5,
    request_timeout: 120,
    log: true
  )

client.cluster.health

输出如下

2022-08-25 14:24:52 -0400: GET https://:9200/ [status:200, request:0.048s, query:n/a]
2022-08-25 14:24:52 -0400: < {
  "name" : "opensearch",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "Aw0F5Pt9QF6XO9vXQHIs_w",
  "version" : {
    "distribution" : "opensearch",
    "number" : "2.2.0",
    "build_type" : "tar",
    "build_hash" : "b1017fa3b9a1c781d4f34ecee411e0cdf930a515",
    "build_date" : "2022-08-09T02:27:25.256769336Z",
    "build_snapshot" : false,
    "lucene_version" : "9.3.0",
    "minimum_wire_compatibility_version" : "7.10.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "The OpenSearch Project: https://opensearch.org.cn/"
}

2022-08-25 14:24:52 -0400: GET https://:9200/_cluster/health [status:200, request:0.018s, query:n/a]
2022-08-25 14:24:52 -0400: < {"cluster_name":"docker-cluster","status":"yellow","timed_out":false,"number_of_nodes":1,"number_of_data_nodes":1,"discovered_master":true,"discovered_cluster_manager":true,"active_primary_shards":10,"active_shards":10,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":8,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":55.55555555555556}

连接到 Amazon OpenSearch 服务

要连接到 Amazon OpenSearch 服务,请首先安装 opensearch-aws-sigv4 gem

gem install opensearch-aws-sigv4
require 'opensearch-aws-sigv4'
require 'aws-sigv4'

signer = Aws::Sigv4::Signer.new(service: 'es',
                                region: 'us-west-2', # signing service region
                                access_key_id: 'key_id',
                                secret_access_key: 'secret')

client = OpenSearch::Aws::Sigv4Client.new({
    host: 'https://your.amz-managed-opensearch.domain',
    log: true
}, signer)

# create an index and document
index = 'prime'
client.indices.create(index: index)
client.index(index: index, id: '1', body: { name: 'Amazon Echo', 
                                            msrp: '5999', 
                                            year: 2011 })

# search for the document
client.search(body: { query: { match: { name: 'Echo' } } })

# delete the document
client.delete(index: index, id: '1')

# delete the index
client.indices.delete(index: index)

连接到 Amazon OpenSearch Serverless

要连接到 Amazon OpenSearch Serverless 服务,请首先安装 opensearch-aws-sigv4 gem

gem install opensearch-aws-sigv4
require 'opensearch-aws-sigv4'
require 'aws-sigv4'

signer = Aws::Sigv4::Signer.new(service: 'aoss',
                                region: 'us-west-2', # signing service region
                                access_key_id: 'key_id',
                                secret_access_key: 'secret')

client = OpenSearch::Aws::Sigv4Client.new({
    host: 'https://your.amz-managed-opensearch.domain', # serverless endpoint for OpenSearch Serverless
    log: true
}, signer)

# create an index and document
index = 'prime'
client.indices.create(index: index)
client.index(index: index, id: '1', body: { name: 'Amazon Echo', 
                                            msrp: '5999', 
                                            year: 2011 })

# search for the document
client.search(body: { query: { match: { name: 'Echo' } } })

# delete the document
client.delete(index: index, id: '1')

# delete the index
client.indices.delete(index: index)

创建索引

您不需要在 OpenSearch 中显式创建索引。一旦您将文档上传到不存在的索引中,OpenSearch 会自动创建该索引。或者,您可以显式创建索引以指定主分片和副本分片的数量等设置。要创建具有非默认设置的索引,请使用这些设置创建一个索引主体哈希

index_body = {
    'settings': {
        'index': {
        'number_of_shards': 1,
        'number_of_replicas': 2 
        }
    }
} 

client.indices.create(
    index: 'students',
    body: index_body
)

映射

OpenSearch 使用动态映射来推断已索引文档的字段类型。但是,为了更好地控制文档的架构,您可以向 OpenSearch 传递显式映射。您可以在此映射中定义文档中部分或所有字段的数据类型。要为索引创建映射,请使用 put_mapping 方法

client.indices.put_mapping(
    index: 'students', 
    body: {
        properties: {
            first_name: { type: 'keyword' },
            last_name: { type: 'keyword' }
        }  
    }
)

默认情况下,字符串字段被映射为 text,但在上面的映射中,first_namelast_name 字段被映射为 keyword。此映射向 OpenSearch 指示这些字段不应被分析,并且应仅支持完全区分大小写的匹配。

您可以使用 get_mapping 方法验证索引的映射

response = client.indices.get_mapping(index: 'students')

如果您提前知道文档的映射并希望避免映射错误(例如,字段名称拼写错误),可以将 dynamic 参数设置为 strict

client.indices.put_mapping(
    index: 'students', 
    body: {
        dynamic: 'strict',
        properties: {
            first_name: { type: 'keyword' },
            last_name: { type: 'keyword' },
            gpa: { type: 'float'},
            grad_year: { type: 'integer'}
        }  
    }
)

使用严格映射,您可以索引一个缺少字段的文档,但不能索引一个包含新字段的文档。例如,索引以下拼写错误的 grad_yea 字段的文档会失败

document = {
    first_name: 'Connor',
    last_name: 'James',
    gpa: 3.93,
    grad_yea: 2021
}
  
client.index(
    index: 'students',
    body: document,
    id: 100,
    refresh: true
)

OpenSearch 返回映射错误

{"error":{"root_cause":[{"type":"strict_dynamic_mapping_exception","reason":"mapping set to strict, dynamic introduction of [grad_yea] within [_doc] is not allowed"}],"type":"strict_dynamic_mapping_exception","reason":"mapping set to strict, dynamic introduction of [grad_yea] within [_doc] is not allowed"},"status":400}

索引单个文档

要索引单个文档,请使用 index 方法

document = {
    first_name: 'Connor',
    last_name: 'James',
    gpa: 3.93,
    grad_year: 2021
}
  
client.index(
    index: 'students',
    body: document,
    id: 100,
    refresh: true
)

更新文档

要更新文档,请使用 update 方法

client.update(index: 'students', 
              id: 100, 
              body: { doc: { gpa: 3.25 } }, 
              refresh: true)

删除文档

要删除文档,请使用 delete 方法

client.delete(
    index: 'students',
    id: 100,
    refresh: true
)

批量操作

您可以使用 bulk 方法同时执行多个操作。这些操作可以是相同类型或不同类型。

您可以使用 bulk 方法索引多个文档

actions = [
    { index: { _index: 'students', _id: '200' } },
    { first_name: 'James', last_name: 'Rodriguez', gpa: 3.91, grad_year: 2019 },
    { index: { _index: 'students', _id: '300' } },
    { first_name: 'Nikki', last_name: 'Wolf', gpa: 3.87, grad_year: 2020 }
]
client.bulk(body: actions, refresh: true)

您可以按如下方式删除多个文档

# Deleting multiple documents.
actions = [
    { delete: { _index: 'students', _id: 200 } },
    { delete: { _index: 'students', _id: 300 } }
]
client.bulk(body: actions, refresh: true)

使用 bulk 时,您可以按如下方式执行不同的操作

actions = [
    { index:  { _index: 'students', _id: 100, data: { first_name: 'Paulo', last_name: 'Santos', gpa: 3.29, grad_year: 2022 } } },
    { index:  { _index: 'students', _id: 200, data: { first_name: 'Shirley', last_name: 'Rodriguez', gpa: 3.92, grad_year: 2020 } } },
    { index:  { _index: 'students', _id: 300, data: { first_name: 'Akua', last_name: 'Mansa', gpa: 3.95, grad_year: 2022 } } },
    { index:  { _index: 'students', _id: 400, data: { first_name: 'John', last_name: 'Stiles', gpa: 3.72, grad_year: 2019 } } },
    { index:  { _index: 'students', _id: 500, data: { first_name: 'Li', last_name: 'Juan', gpa: 3.94, grad_year: 2022 } } },
    { index:  { _index: 'students', _id: 600, data: { first_name: 'Richard', last_name: 'Roe', gpa: 3.04, grad_year: 2020 } } },
    { update: { _index: 'students', _id: 100, data: { doc: { gpa: 3.73 } } } },
    { delete: { _index: 'students', _id: 200  } }
]
client.bulk(body: actions, refresh: true)

在上面的示例中,您将数据和头部一起传递,并使用 data: 键表示数据。

搜索文档

要搜索文档,请使用 search 方法。以下示例搜索姓名为“James”的学生。它使用 multi_match 查询搜索两个字段(first_namelast_name),并通过插入符号表示法(last_name^2)提高 last_name 字段的相关性。

q = 'James'
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['first_name', 'last_name^2']
    }
  }
}

response = client.search(
  body: query,
  index: 'students'
)

如果您在 search 方法中省略请求体,您的查询将变为 match_all 查询并返回索引中的所有文档

client.search(index: 'students')

布尔查询

Ruby 客户端提供完整的 OpenSearch 查询功能。除了使用匹配查询的简单搜索之外,您还可以创建更复杂的布尔查询来搜索 2022 年毕业的学生并按姓氏排序。在下面的示例中,搜索限制为 10 个文档。

query = {
    'query': {
        'bool': {
        'filter': {
            'term': {
                'grad_year': 2022
                
            }
        }
        }
    },
    'sort': {
        'last_name': {
            'order': 'asc'
        }
    }       
}

response = client.search(index: 'students', from: 0, size: 10, body: query)

您可以将多个查询批量组合在一起,并使用 msearch 方法执行多重搜索。以下代码搜索 GPA 在 3.1–3.9 范围之外的学生

actions = [
    {},
    {query: {range: {gpa: {gt: 3.9}}}},
    {},
    {query: {range: {gpa: {lt: 3.1}}}}
]
response = client.msearch(index: 'students', body: actions)

滚动

您可以使用 Scroll API 对搜索结果进行分页

response = client.search(index: index_name, scroll: '2m', size: 2)

while response['hits']['hits'].size.positive?
    scroll_id = response['_scroll_id']
    puts(response['hits']['hits'].map { |doc| [doc['_source']['first_name'] + ' ' + doc['_source']['last_name']] })
    response = client.scroll(scroll: '1m', body: { scroll_id: scroll_id })
end

首先,您发出一个搜索查询,指定 scrollsize 参数。scroll 参数告诉 OpenSearch 保持搜索上下文多长时间。在此示例中,它设置为两分钟。size 参数指定您希望在每个请求中返回多少文档。

初始搜索查询的响应包含一个 _scroll_id,您可以使用它来获取下一组文档。为此,您使用 scroll 方法,再次指定 scroll 参数并在主体中传递 _scroll_id。您不需要向 scroll 方法指定查询或索引。scroll 方法返回下一组文档和 _scroll_id。在请求下一批文档时使用最新的 _scroll_id 非常重要,因为 _scroll_id 可能会在请求之间发生变化。

删除索引

您可以使用 delete 方法删除索引

response = client.indices.delete(index: index_name)

示例程序

以下是一个完整的示例程序,它演示了前面部分中描述的所有概念。Ruby 客户端的方法将响应作为 Ruby 哈希返回,这很难阅读。为了以美观的格式显示 JSON 响应,示例程序使用了 MultiJson.dump 方法。

require 'opensearch'

client = OpenSearch::Client.new(host: 'https://:9200')

# Create an index with non-default settings
index_name = 'students'
index_body = {
    'settings': {
      'index': {
        'number_of_shards': 1,
        'number_of_replicas': 2 
      }
    }
  } 

client.indices.create(
    index: index_name,
    body: index_body
)

# Create a mapping
client.indices.put_mapping(
    index: index_name, 
    body: {
        properties: {
            first_name: { type: 'keyword' },
            last_name: { type: 'keyword' }
        }  
    }
)

# Get mappings
response = client.indices.get_mapping(index: index_name)
puts 'Mappings for the students index:'
puts MultiJson.dump(response, pretty: "true")

# Add one document to the index
puts 'Adding one document:'
document = {
    first_name: 'Connor',
    last_name: 'James',
    gpa: 3.93,
    grad_year: 2021
}
id = 100
  
client.index(
    index: index_name,
    body: document,
    id: id,
    refresh: true
)
  
response = client.search(index: index_name)
puts MultiJson.dump(response, pretty: "true")
  
# Update a document
puts 'Updating a document:'
client.update(index: index_name, id: id, body: { doc: { gpa: 3.25 } }, refresh: true)
response = client.search(index: index_name)
puts MultiJson.dump(response, pretty: "true")
print 'The updated gpa is '
puts response['hits']['hits'].map { |doc| doc['_source']['gpa'] }

# Add many documents in bulk
documents = [
{ index: { _index: index_name, _id: '200' } },
{ first_name: 'James', last_name: 'Rodriguez', gpa: 3.91, grad_year: 2019},
{ index: { _index: index_name, _id: '300' } },
{ first_name: 'Nikki', last_name: 'Wolf', gpa: 3.87, grad_year: 2020}
]
client.bulk(body: documents, refresh: true)

# Get all documents in the index
response = client.search(index: index_name)
puts 'All documents in the index after bulk upload:'
puts MultiJson.dump(response, pretty: "true")

# Search for a document using a multi_match query
puts 'Searching for documents that match "James":'
q = 'James'
query = {
  'size': 5,
  'query': {
    'multi_match': {
      'query': q,
      'fields': ['first_name', 'last_name^2']
    }
  }
}

response = client.search(
  body: query,
  index: index_name
)
puts MultiJson.dump(response, pretty: "true")

# Delete the document
response = client.delete(
index: index_name,
id: id,
refresh: true
)

response = client.search(index: index_name)
puts 'Documents in the index after one document was deleted:'
puts MultiJson.dump(response, pretty: "true")

# Delete multiple documents
actions = [
    { delete: { _index: index_name, _id: 200 } },
    { delete: { _index: index_name, _id: 300 } }
]
client.bulk(body: actions, refresh: true)

response = client.search(index: index_name)

puts 'Documents in the index after all documents were deleted:'
puts MultiJson.dump(response, pretty: "true")

# Bulk several operations together
actions = [
    { index:  { _index: index_name, _id: 100, data: { first_name: 'Paulo', last_name: 'Santos', gpa: 3.29, grad_year: 2022 } } },
    { index:  { _index: index_name, _id: 200, data: { first_name: 'Shirley', last_name: 'Rodriguez', gpa: 3.92, grad_year: 2020 } } },
    { index:  { _index: index_name, _id: 300, data: { first_name: 'Akua', last_name: 'Mansa', gpa: 3.95, grad_year: 2022 } } },
    { index:  { _index: index_name, _id: 400, data: { first_name: 'John', last_name: 'Stiles', gpa: 3.72, grad_year: 2019 } } },
    { index:  { _index: index_name, _id: 500, data: { first_name: 'Li', last_name: 'Juan', gpa: 3.94, grad_year: 2022 } } },
    { index:  { _index: index_name, _id: 600, data: { first_name: 'Richard', last_name: 'Roe', gpa: 3.04, grad_year: 2020 } } },
    { update: { _index: index_name, _id: 100, data: { doc: { gpa: 3.73 } } } },
    { delete: { _index: index_name, _id: 200  } }
]
client.bulk(body: actions, refresh: true)

puts 'All documents in the index after bulk operations with scrolling:'
response = client.search(index: index_name, scroll: '2m', size: 2)

while response['hits']['hits'].size.positive?
    scroll_id = response['_scroll_id']
    puts(response['hits']['hits'].map { |doc| [doc['_source']['first_name'] + ' ' + doc['_source']['last_name']] })
    response = client.scroll(scroll: '1m', body: { scroll_id: scroll_id })
end

# Multi search
actions = [
    {},
    {query: {range: {gpa: {gt: 3.9}}}},
    {},
    {query: {range: {gpa: {lt: 3.1}}}}
]
response = client.msearch(index: index_name, body: actions)

puts 'Multi search results:'
puts MultiJson.dump(response, pretty: "true")

# Boolean query
query = {
    'query': {
        'bool': {
        'filter': {
            'term': {
                'grad_year': 2022
                
            }
        }
        }
    },
    'sort': {
        'last_name': {
            'order': 'asc'
        }
    }       
}

response = client.search(index: index_name, from: 0, size: 10, body: query)

puts 'Boolean query search results:'
puts MultiJson.dump(response, pretty: "true")

# Delete the index
puts 'Deleting the index:'
response = client.indices.delete(index: index_name)

puts MultiJson.dump(response, pretty: "true")

Ruby AWS Sigv4 客户端

opensearch-aws-sigv4 gem 提供了 OpenSearch::Aws::Sigv4Client 类,该类具有 OpenSearch::Client 的所有功能。这两个客户端之间唯一的区别是 OpenSearch::Aws::Sigv4Client 在实例化时需要一个 Aws::Sigv4::Signer 实例来进行 AWS 认证

require 'opensearch-aws-sigv4'
require 'aws-sigv4'

signer = Aws::Sigv4::Signer.new(service: 'es',
                                region: 'us-west-2',
                                access_key_id: 'key_id',
                                secret_access_key: 'secret')

client = OpenSearch::Aws::Sigv4Client.new({ log: true }, signer)

client.cluster.health

client.transport.reload_connections!

client.search q: 'test'