任务调度器
OpenSearch 作业调度器插件提供了一个框架,可用于构建集群上常见任务的调度。您可以使用作业调度器的服务提供商接口 (SPI) 来定义集群管理任务的调度,例如创建快照、管理数据生命周期和运行周期性作业。作业调度器有一个用于监听 OpenSearch 集群上更新事件的清扫器 (sweeper) 和一个管理作业何时运行的调度器。
您可以通过遵循标准 OpenSearch 插件安装过程来安装作业调度器插件。作业调度器 GitHub 仓库中提供的 sample-extension-plugin 示例提供了一个在构建插件时利用作业调度器的完整示例。要定义调度,您需要构建一个实现作业调度器库中提供的接口的插件。您可以通过指定间隔来调度作业,或者可以使用 Unix cron 表达式(例如 0 12 * * ?
,表示每天中午运行)来定义更灵活的调度。
构建作业调度器插件
OpenSearch 插件开发者可以扩展作业调度器插件来调度在集群上执行的作业。您可以调度作业,例如对原始数据运行聚合查询,每小时将聚合数据保存到新索引中,或者通过调用 OpenSearch API 然后将输出发布到 webhook 来继续监控分片分配。
有关构建使用作业调度器插件的示例,请参阅作业调度器 README。
定义端点
您可以通过参考 示例 SampleExtensionRestHandler.java
文件来配置插件的 API 端点。使用 WATCH_INDEX_URI
设置插件将公开的端点 URL。
public class SampleExtensionRestHandler extends BaseRestHandler {
public static final String WATCH_INDEX_URI = "/_plugins/scheduler_sample/watch";
您可以通过扩展 ScheduledJobParameter
来定义作业配置。您还可以定义插件使用的字段,例如 indexToWatch
,如 示例 SampleJobParameter
文件所示。此作业配置将作为文档保存在您定义的索引中,如此示例所示。
配置参数
您可以通过参考 示例 SampleJobParameter.java
文件并根据您的需求进行修改来配置插件的参数。
/**
* A sample job parameter.
* <p>
* It adds an additional "indexToWatch" field to {@link ScheduledJobParameter}, which stores the index
* the job runner will watch.
*/
public class SampleJobParameter implements ScheduledJobParameter {
public static final String NAME_FIELD = "name";
public static final String ENABLED_FILED = "enabled";
public static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field";
public static final String SCHEDULE_FIELD = "schedule";
public static final String ENABLED_TIME_FILED = "enabled_time";
public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field";
public static final String INDEX_NAME_FIELD = "index_name_to_watch";
public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds";
public static final String JITTER = "jitter";
private String jobName;
private Instant lastUpdateTime;
private Instant enabledTime;
private boolean isEnabled;
private Schedule schedule;
private String indexToWatch;
private Long lockDurationSeconds;
private Double jitter;
接下来,配置您希望插件与作业调度器一起使用的请求参数。这些参数将基于您在配置插件时声明的变量。以下示例显示了您在构建插件时设置的请求参数。
public SampleJobParameter(String id, String name, String indexToWatch, Schedule schedule, Long lockDurationSeconds, Double jitter) {
this.jobName = name;
this.indexToWatch = indexToWatch;
this.schedule = schedule;
Instant now = Instant.now();
this.isEnabled = true;
this.enabledTime = now;
this.lastUpdateTime = now;
this.lockDurationSeconds = lockDurationSeconds;
this.jitter = jitter;
}
@Override
public String getName() {
return this.jobName;
}
@Override
public Instant getLastUpdateTime() {
return this.lastUpdateTime;
}
@Override
public Instant getEnabledTime() {
return this.enabledTime;
}
@Override
public Schedule getSchedule() {
return this.schedule;
}
@Override
public boolean isEnabled() {
return this.isEnabled;
}
@Override
public Long getLockDurationSeconds() {
return this.lockDurationSeconds;
}
@Override public Double getJitter() {
return jitter;
}
下表描述了上一个示例中配置的请求参数。显示的所有请求参数都是必需的。
字段 | 数据类型 | 描述 |
---|---|---|
getName | 字符串 | 返回作业的名称。 |
getLastUpdateTime | 时间单位 | 返回作业上次运行的时间。 |
getEnabledTime | 时间单位 | 返回作业启用时间。 |
getSchedule | Unix cron | 以 Unix cron 语法返回作业调度。 |
isEnabled | 布尔型 | 指示作业是否启用。 |
getLockDurationSeconds | 整数 | 返回作业被锁定的持续时间。 |
getJitter | 整数 | 返回定义的抖动值。 |
您的作业使用的逻辑应由 SampleJobParameter.java
示例文件中扩展自 ScheduledJobRunner
的类定义,例如 SampleJobRunner
。当作业正在运行时,有一个锁定机制可以用来防止其他节点运行相同的作业。首先,获取锁。然后确保在作业完成之前释放锁。
有关更多信息,请参阅 作业调度器 GitHub 仓库中的作业调度器 示例扩展目录。
作业调度器集群设置
作业调度器插件支持以下集群设置。所有设置都是动态的。要了解有关静态和动态设置的更多信息,请参阅配置 OpenSearch。
设置 | 数据类型 | 描述 |
---|---|---|
plugins.jobscheduler.jitter_limit | Double | 定义作业执行时间的最大延迟乘数。太多作业同时启动可能导致高资源消耗。为了平衡负载,您可以向启动时间添加随机抖动延迟。例如,如果时间间隔是 10 分钟,抖动是 0.6,则下一个作业运行将随机延迟 0 到 6 分钟之间的时间段。 |
plugins.jobscheduler.request_timeout | 时间单位 | 后台清扫搜索超时。后台清扫指注册作业的自动调度和执行。它以一定间隔发生,并遍历每个扩展插件的注册作业索引,搜索要执行的作业。 |
plugins.jobscheduler.retry_count | 整数 | 用于定义指数退避策略的重试次数。退避策略决定了批量处理器在重试批量操作之前将等待多长时间。当批量索引请求因请求时的资源限制而受到影响或被拒绝时,将使用它。对于作业调度器插件,这会影响搜索注册作业索引。 |
plugins.jobscheduler.sweeper.backoff_millis | 时间单位 | 用于定义指数退避策略的初始等待时间(以毫秒为单位)。退避策略决定了批量处理器在重试批量操作之前将等待多长时间。当批量索引请求因请求时的资源限制而受到影响或被拒绝时,将使用它。对于作业调度器插件,这会影响搜索注册作业索引。 |
plugins.jobscheduler.sweeper.page_size | 整数 | 配置用于在注册作业索引中查找作业文档的搜索请求。定义要返回的搜索命中数。 |
plugins.jobscheduler.sweeper.period | 时间单位 | 定义后台清扫执行前的初始延迟时间。 |