使用 OpenSearch 索引优化查询性能
2.11 版新增
使用外部数据源时,查询性能可能因为网络延迟、数据转换和数据量等原因而变慢。您可以使用 OpenSearch 索引(例如跳过索引或覆盖索引)来优化查询性能。
- “跳过索引”使用跳过加速方法(例如分区、最小值和最大值以及值集)来摄入并创建紧凑的聚合数据结构。这使其成为直接查询场景的经济实惠选项。有关更多信息,请参阅跳过索引。
- “覆盖索引”将源中的全部或部分数据摄入 OpenSearch,并使其能够使用所有 OpenSearch Dashboards 和插件功能。有关更多信息,请参阅覆盖索引。
- “物化视图”通过存储来自源数据的预计算和聚合数据来提高查询性能。有关更多信息,请参阅物化视图。
有关每个索引过程的全面指导,请参阅Flint 索引参考手册。
数据源用例:加速性能
要开始加速查询性能,请执行以下步骤
- 转到 OpenSearch Dashboards > Query Workbench,然后从 Data sources 下拉菜单中选择您的数据源。
- 从导航菜单中选择一个数据库。
- 在表中查看结果并确认您拥有正确的数据。
- 请按照以下步骤创建 OpenSearch 索引
- 选择 Accelerate data。此时会弹出一个窗口。
- 在 Select data fields 下输入您的数据库和表详细信息。
- 对于 Acceleration type,根据您的用例选择加速类型。然后,输入您的加速类型信息。有关更多信息,请参阅以下部分
跳过索引
“跳过索引”使用跳过加速方法(例如分区、最小值/最大值和值集)通过紧凑的聚合数据结构摄入数据。这使其成为直接查询场景的经济实惠选项。
使用跳过索引,您只能索引存储在 Amazon S3 中的数据的元数据。当您使用跳过索引查询表时,查询规划器会引用该索引并重写查询以高效定位数据,而不是扫描所有分区和文件。这使得跳过索引能够快速缩小存储数据的具体位置。
定义跳过索引设置
- 在 Skipping index definition 下,选择 Generate 自动生成跳过索引。或者,要手动选择要添加的字段,请选择 Add fields。从以下类型中选择
Partition
:使用数据分区详细信息定位数据。此类型最适合基于分区的列,例如年、月、日、小时。MinMax
:使用索引列的下限和上限定位数据。此类型最适合数值列。ValueSet
:使用唯一值集定位数据。此类型最适合基数低到中等且需要精确匹配的列。BloomFilter
:使用布隆过滤器算法定位数据。此类型最适合基数高且不需要精确匹配的列。
- 选择 Create acceleration 应用您的跳过索引设置。
- 查看跳过索引查询详细信息,然后单击 Run。OpenSearch 会将您的索引添加到左侧导航窗格。
或者,您可以使用 Query Workbench 手动创建表的跳过索引。从下拉列表中选择您的数据源,然后运行如下查询
CREATE SKIPPING INDEX
ON datasourcename.gluedatabasename.vpclogstable(
`srcaddr` BLOOM_FILTER,
`dstaddr` BLOOM_FILTER,
`day` PARTITION,
`account_id`BLOOM_FILTER
) WITH (
index_settings = '{"number_of_shards":5,"number_of_replicas":1}',
auto_refresh = true,
checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint'
)
覆盖索引
“覆盖索引”将源中的全部或部分数据摄入 OpenSearch,并使其能够使用所有 OpenSearch Dashboards 和插件功能。
使用覆盖索引,您可以从表中指定列中摄入数据。这是三种索引类型中性能最好的一种。因为 OpenSearch 会摄入您所需列中的所有数据,所以您可以获得更好的性能并执行高级分析。
OpenSearch 从覆盖索引数据创建新索引。您可以使用此新索引来创建可视化,或用于异常检测和地理空间功能。您可以使用索引状态管理来管理覆盖视图索引。有关更多信息,请参阅索引状态管理。
定义覆盖索引设置
- 对于 Index name,输入有效的索引名称。请注意,每个表可以有多个覆盖索引。
- 选择 Refresh type。默认情况下,OpenSearch 会自动刷新索引。否则,您必须使用 REFRESH 语句手动触发刷新。
- 输入 Checkpoint location,这是刷新作业检查点的路径。该位置必须是与 Hadoop 分布式文件系统 (HDFS) 兼容的文件系统中的路径。有关更多信息,请参阅启动流查询。
- 通过在 Covering index definition 下选择 (add fields here) 来定义覆盖索引字段。
- 选择 Create acceleration 应用您的覆盖索引设置。
- 查看覆盖索引查询详细信息,然后单击 Run。OpenSearch 会将您的索引添加到左侧导航窗格。
或者,您可以使用 Query Workbench 手动在表上创建覆盖索引。从下拉列表中选择您的数据源,然后运行如下查询
CREATE INDEX vpc_covering_index
ON datasourcename.gluedatabasename.vpclogstable (version, account_id, interface_id,
srcaddr, dstaddr, srcport, dstport, protocol, packets,
bytes, start, action, log_status STRING,
`aws-account-id`, `aws-service`, `aws-region`, year,
month, day, hour )
WITH (
auto_refresh = true,
refresh_interval = '15 minute',
checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint'
)
物化视图
通过“物化视图”,您可以使用复杂查询(例如聚合)来支持 Dashboards 可视化。物化视图根据查询将少量数据摄入 OpenSearch。然后,OpenSearch 会根据摄入的数据形成一个索引,您可以将其用于可视化。您可以使用索引状态管理来管理物化视图索引。有关更多信息,请参阅索引状态管理。
定义物化视图设置
- 对于 Index name,输入有效的索引名称。请注意,每个表可以有多个覆盖索引。
- 选择 Refresh type。默认情况下,OpenSearch 会自动刷新索引。否则,您必须使用
REFRESH
语句手动触发刷新。 - 输入 Checkpoint location,这是刷新作业检查点的路径。该位置必须是 HDFS 兼容文件系统中的路径。
- 输入 Watermark delay,它定义了数据可以延迟多长时间仍能被处理,例如 1 分钟或 10 秒。
- 在 Materialized view definition 下定义覆盖索引字段。
- 选择 Create acceleration 应用您的物化视图索引设置。
- 查看物化视图查询详细信息,然后单击 Run。OpenSearch 会将您的索引添加到左侧导航窗格。
或者,您可以使用 Query Workbench 手动在表上创建物化视图索引。从下拉列表中选择您的数据源,然后运行如下查询
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,
CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,
CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,
CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,
CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,
severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,
region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
datasourcename.gluedatabasename.vpclogstable
WITH (
auto_refresh = true,
refresh_interval = '15 Minute',
checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint',
watermark_delay = '1 Minute',
)
限制
此功能仍在开发中,因此存在一些限制。有关实时更新,请参阅 GitHub 上的开发者文档。