Link Search Menu Expand Document Documentation Menu

任务调度器

OpenSearch 作业调度器插件提供了一个框架,可用于构建集群上常见任务的调度。您可以使用作业调度器的服务提供商接口 (SPI) 来定义集群管理任务的调度,例如创建快照、管理数据生命周期和运行周期性作业。作业调度器有一个用于监听 OpenSearch 集群上更新事件的清扫器 (sweeper) 和一个管理作业何时运行的调度器。

您可以通过遵循标准 OpenSearch 插件安装过程来安装作业调度器插件。作业调度器 GitHub 仓库中提供的 sample-extension-plugin 示例提供了一个在构建插件时利用作业调度器的完整示例。要定义调度,您需要构建一个实现作业调度器库中提供的接口的插件。您可以通过指定间隔来调度作业,或者可以使用 Unix cron 表达式(例如 0 12 * * ?,表示每天中午运行)来定义更灵活的调度。

构建作业调度器插件

OpenSearch 插件开发者可以扩展作业调度器插件来调度在集群上执行的作业。您可以调度作业,例如对原始数据运行聚合查询,每小时将聚合数据保存到新索引中,或者通过调用 OpenSearch API 然后将输出发布到 webhook 来继续监控分片分配。

有关构建使用作业调度器插件的示例,请参阅作业调度器 README

定义端点

您可以通过参考 示例 SampleExtensionRestHandler.java 文件来配置插件的 API 端点。使用 WATCH_INDEX_URI 设置插件将公开的端点 URL。

public class SampleExtensionRestHandler extends BaseRestHandler {
    public static final String WATCH_INDEX_URI = "/_plugins/scheduler_sample/watch";

您可以通过扩展 ScheduledJobParameter 来定义作业配置。您还可以定义插件使用的字段,例如 indexToWatch,如 示例 SampleJobParameter 文件所示。此作业配置将作为文档保存在您定义的索引中,如此示例所示。

配置参数

您可以通过参考 示例 SampleJobParameter.java 文件并根据您的需求进行修改来配置插件的参数。

/**
 * A sample job parameter.
 * <p>
 * It adds an additional "indexToWatch" field to {@link ScheduledJobParameter}, which stores the index
 * the job runner will watch.
 */
public class SampleJobParameter implements ScheduledJobParameter {
    public static final String NAME_FIELD = "name";
    public static final String ENABLED_FILED = "enabled";
    public static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
    public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field";
    public static final String SCHEDULE_FIELD = "schedule";
    public static final String ENABLED_TIME_FILED = "enabled_time";
    public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field";
    public static final String INDEX_NAME_FIELD = "index_name_to_watch";
    public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds";
    public static final String JITTER = "jitter";

    private String jobName;
    private Instant lastUpdateTime;
    private Instant enabledTime;
    private boolean isEnabled;
    private Schedule schedule;
    private String indexToWatch;
    private Long lockDurationSeconds;
    private Double jitter;

接下来,配置您希望插件与作业调度器一起使用的请求参数。这些参数将基于您在配置插件时声明的变量。以下示例显示了您在构建插件时设置的请求参数。

public SampleJobParameter(String id, String name, String indexToWatch, Schedule schedule, Long lockDurationSeconds, Double jitter) {
        this.jobName = name;
        this.indexToWatch = indexToWatch;
        this.schedule = schedule;

        Instant now = Instant.now();
        this.isEnabled = true;
        this.enabledTime = now;
        this.lastUpdateTime = now;
        this.lockDurationSeconds = lockDurationSeconds;
        this.jitter = jitter;
    }

    @Override
    public String getName() {
        return this.jobName;
    }

    @Override
    public Instant getLastUpdateTime() {
        return this.lastUpdateTime;
    }

    @Override
    public Instant getEnabledTime() {
        return this.enabledTime;
    }

    @Override
    public Schedule getSchedule() {
        return this.schedule;
    }

    @Override
    public boolean isEnabled() {
        return this.isEnabled;
    }

    @Override
    public Long getLockDurationSeconds() {
        return this.lockDurationSeconds;
    }

    @Override public Double getJitter() {
        return jitter;
    }

下表描述了上一个示例中配置的请求参数。显示的所有请求参数都是必需的。

字段 数据类型 描述
getName 字符串 返回作业的名称。
getLastUpdateTime 时间单位 返回作业上次运行的时间。
getEnabledTime 时间单位 返回作业启用时间。
getSchedule Unix cron 以 Unix cron 语法返回作业调度。
isEnabled 布尔型 指示作业是否启用。
getLockDurationSeconds 整数 返回作业被锁定的持续时间。
getJitter 整数 返回定义的抖动值。

您的作业使用的逻辑应由 SampleJobParameter.java 示例文件中扩展自 ScheduledJobRunner 的类定义,例如 SampleJobRunner。当作业正在运行时,有一个锁定机制可以用来防止其他节点运行相同的作业。首先,获取锁。然后确保在作业完成之前释放锁。

有关更多信息,请参阅 作业调度器 GitHub 仓库中的作业调度器 示例扩展目录。

作业调度器集群设置

作业调度器插件支持以下集群设置。所有设置都是动态的。要了解有关静态和动态设置的更多信息,请参阅配置 OpenSearch

设置 数据类型 描述
plugins.jobscheduler.jitter_limit Double 定义作业执行时间的最大延迟乘数。太多作业同时启动可能导致高资源消耗。为了平衡负载,您可以向启动时间添加随机抖动延迟。例如,如果时间间隔是 10 分钟,抖动是 0.6,则下一个作业运行将随机延迟 0 到 6 分钟之间的时间段。
plugins.jobscheduler.request_timeout 时间单位 后台清扫搜索超时。后台清扫指注册作业的自动调度和执行。它以一定间隔发生,并遍历每个扩展插件的注册作业索引,搜索要执行的作业。
plugins.jobscheduler.retry_count 整数 用于定义指数退避策略的重试次数。退避策略决定了批量处理器在重试批量操作之前将等待多长时间。当批量索引请求因请求时的资源限制而受到影响或被拒绝时,将使用它。对于作业调度器插件,这会影响搜索注册作业索引。
plugins.jobscheduler.sweeper.backoff_millis 时间单位 用于定义指数退避策略的初始等待时间(以毫秒为单位)。退避策略决定了批量处理器在重试批量操作之前将等待多长时间。当批量索引请求因请求时的资源限制而受到影响或被拒绝时,将使用它。对于作业调度器插件,这会影响搜索注册作业索引。
plugins.jobscheduler.sweeper.page_size 整数 配置用于在注册作业索引中查找作业文档的搜索请求。定义要返回的搜索命中数。
plugins.jobscheduler.sweeper.period 时间单位 定义后台清扫执行前的初始延迟时间。
剩余 350 字符

有问题?

想做贡献?