Link Search Menu Expand Document Documentation Menu

使用 OpenSearch 索引优化查询性能

2.11 版新增

使用外部数据源时,查询性能可能因为网络延迟、数据转换和数据量等原因而变慢。您可以使用 OpenSearch 索引(例如跳过索引或覆盖索引)来优化查询性能。

  • “跳过索引”使用跳过加速方法(例如分区、最小值和最大值以及值集)来摄入并创建紧凑的聚合数据结构。这使其成为直接查询场景的经济实惠选项。有关更多信息,请参阅跳过索引
  • “覆盖索引”将源中的全部或部分数据摄入 OpenSearch,并使其能够使用所有 OpenSearch Dashboards 和插件功能。有关更多信息,请参阅覆盖索引
  • “物化视图”通过存储来自源数据的预计算和聚合数据来提高查询性能。有关更多信息,请参阅物化视图

有关每个索引过程的全面指导,请参阅Flint 索引参考手册

数据源用例:加速性能

要开始加速查询性能,请执行以下步骤

  1. 转到 OpenSearch Dashboards > Query Workbench,然后从 Data sources 下拉菜单中选择您的数据源。
  2. 从导航菜单中选择一个数据库。
  3. 在表中查看结果并确认您拥有正确的数据。
  4. 请按照以下步骤创建 OpenSearch 索引
    1. 选择 Accelerate data。此时会弹出一个窗口。
    2. Select data fields 下输入您的数据库和表详细信息。
  5. 对于 Acceleration type,根据您的用例选择加速类型。然后,输入您的加速类型信息。有关更多信息,请参阅以下部分

跳过索引

“跳过索引”使用跳过加速方法(例如分区、最小值/最大值和值集)通过紧凑的聚合数据结构摄入数据。这使其成为直接查询场景的经济实惠选项。

使用跳过索引,您只能索引存储在 Amazon S3 中的数据的元数据。当您使用跳过索引查询表时,查询规划器会引用该索引并重写查询以高效定位数据,而不是扫描所有分区和文件。这使得跳过索引能够快速缩小存储数据的具体位置。

定义跳过索引设置

  1. Skipping index definition 下,选择 Generate 自动生成跳过索引。或者,要手动选择要添加的字段,请选择 Add fields。从以下类型中选择
    • Partition:使用数据分区详细信息定位数据。此类型最适合基于分区的列,例如年、月、日、小时。
    • MinMax:使用索引列的下限和上限定位数据。此类型最适合数值列。
    • ValueSet:使用唯一值集定位数据。此类型最适合基数低到中等且需要精确匹配的列。
    • BloomFilter:使用布隆过滤器算法定位数据。此类型最适合基数高且不需要精确匹配的列。
  2. 选择 Create acceleration 应用您的跳过索引设置。
  3. 查看跳过索引查询详细信息,然后单击 Run。OpenSearch 会将您的索引添加到左侧导航窗格。

或者,您可以使用 Query Workbench 手动创建表的跳过索引。从下拉列表中选择您的数据源,然后运行如下查询

CREATE SKIPPING INDEX
ON datasourcename.gluedatabasename.vpclogstable(
 `srcaddr` BLOOM_FILTER, 
 `dstaddr` BLOOM_FILTER, 
 `day` PARTITION, 
 `account_id`BLOOM_FILTER
 ) WITH (
index_settings = '{"number_of_shards":5,"number_of_replicas":1}',
auto_refresh = true,
checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint'
)

覆盖索引

“覆盖索引”将源中的全部或部分数据摄入 OpenSearch,并使其能够使用所有 OpenSearch Dashboards 和插件功能。

使用覆盖索引,您可以从表中指定列中摄入数据。这是三种索引类型中性能最好的一种。因为 OpenSearch 会摄入您所需列中的所有数据,所以您可以获得更好的性能并执行高级分析。

OpenSearch 从覆盖索引数据创建新索引。您可以使用此新索引来创建可视化,或用于异常检测和地理空间功能。您可以使用索引状态管理来管理覆盖视图索引。有关更多信息,请参阅索引状态管理

定义覆盖索引设置

  1. 对于 Index name,输入有效的索引名称。请注意,每个表可以有多个覆盖索引。
  2. 选择 Refresh type。默认情况下,OpenSearch 会自动刷新索引。否则,您必须使用 REFRESH 语句手动触发刷新。
  3. 输入 Checkpoint location,这是刷新作业检查点的路径。该位置必须是与 Hadoop 分布式文件系统 (HDFS) 兼容的文件系统中的路径。有关更多信息,请参阅启动流查询
  4. 通过在 Covering index definition 下选择 (add fields here) 来定义覆盖索引字段。
  5. 选择 Create acceleration 应用您的覆盖索引设置。
  6. 查看覆盖索引查询详细信息,然后单击 Run。OpenSearch 会将您的索引添加到左侧导航窗格。

或者,您可以使用 Query Workbench 手动在表上创建覆盖索引。从下拉列表中选择您的数据源,然后运行如下查询

CREATE INDEX vpc_covering_index
ON datasourcename.gluedatabasename.vpclogstable (version, account_id, interface_id, 
srcaddr, dstaddr, srcport, dstport, protocol, packets, 
bytes, start, action, log_status STRING, 
`aws-account-id`, `aws-service`, `aws-region`, year, 
month, day, hour )
WITH (
  auto_refresh = true,
  refresh_interval = '15 minute',
  checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint'
)

物化视图

通过“物化视图”,您可以使用复杂查询(例如聚合)来支持 Dashboards 可视化。物化视图根据查询将少量数据摄入 OpenSearch。然后,OpenSearch 会根据摄入的数据形成一个索引,您可以将其用于可视化。您可以使用索引状态管理来管理物化视图索引。有关更多信息,请参阅索引状态管理

定义物化视图设置

  1. 对于 Index name,输入有效的索引名称。请注意,每个表可以有多个覆盖索引。
  2. 选择 Refresh type。默认情况下,OpenSearch 会自动刷新索引。否则,您必须使用 REFRESH 语句手动触发刷新。
  3. 输入 Checkpoint location,这是刷新作业检查点的路径。该位置必须是 HDFS 兼容文件系统中的路径。
  4. 输入 Watermark delay,它定义了数据可以延迟多长时间仍能被处理,例如 1 分钟或 10 秒。
  5. Materialized view definition 下定义覆盖索引字段。
  6. 选择 Create acceleration 应用您的物化视图索引设置。
  7. 查看物化视图查询详细信息,然后单击 Run。OpenSearch 会将您的索引添加到左侧导航窗格。

或者,您可以使用 Query Workbench 手动在表上创建物化视图索引。从下拉列表中选择您的数据源,然后运行如下查询

CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
  SELECT
    cloud.account_uid AS `aws.vpc.cloud_account_uid`,
    cloud.region AS `aws.vpc.cloud_region`,
    cloud.zone AS `aws.vpc.cloud_zone`,
    cloud.provider AS `aws.vpc.cloud_provider`,

    CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
    CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING)  AS `aws.vpc.pkt-src-aws-service`,
    CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING)  AS `aws.vpc.srcaddr`,
    CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING)  AS `aws.vpc.src-interface_uid`,
    CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING)  AS `aws.vpc.src-vpc_uid`,
    CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING)  AS `aws.vpc.src-instance_uid`,
    CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING)  AS `aws.vpc.src-subnet_uid`,

    CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
    CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
    CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING)  AS `aws.vpc.dstaddr`,
    CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING)  AS `aws.vpc.dst-interface_uid`,
    CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING)  AS `aws.vpc.dst-vpc_uid`,
    CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING)  AS `aws.vpc.dst-instance_uid`,
    CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING)  AS `aws.vpc.dst-subnet_uid`,
    CASE
      WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
        THEN 'ingress'
      ELSE 'egress'
      END AS `aws.vpc.flow-direction`,

    CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
    CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING)  AS `aws.vpc.connection.tcp_flags`,
    CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING)  AS `aws.vpc.connection.protocol_ver`,
    CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING)  AS `aws.vpc.connection.boundary`,
    CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING)  AS `aws.vpc.connection.direction`,

    CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
    CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

    CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
    CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
    CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
    CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
    status_code AS `aws.vpc.status_code`,

    severity AS `aws.vpc.severity`,
    class_name AS `aws.vpc.class_name`,
    category_name AS `aws.vpc.category_name`,
    activity_name AS `aws.vpc.activity_name`,
    disposition AS `aws.vpc.disposition`,
    type_name AS `aws.vpc.type_name`,

    region AS `aws.vpc.region`,
    accountid AS `aws.vpc.account-id`
  FROM
  datasourcename.gluedatabasename.vpclogstable 
WITH (
  auto_refresh = true,
  refresh_interval = '15 Minute',
  checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint',
  watermark_delay = '1 Minute',
)

限制

此功能仍在开发中,因此存在一些限制。有关实时更新,请参阅 GitHub 上的开发者文档

剩余 350 字符

有问题?

想要贡献?