基于Quartz编写一个可复用的分布式调度任务管理WebUI组件

编程语言 2020-12-17 03:51:05
前提 创业小团队,无论选择任何方案,都优先考虑节省成本。关于分布式定时调度框架,成熟的候选方案有XXL-JOB、Easy Scheduler、Light Task Scheduler和Elastic
  • Uikit:选用的前端的一个轻量级的UI框架,主要是考虑到轻量、文档和组件相对齐全。
  • JQuery:选用js框架,原因只有一个:简单。
  • Freemarker:模板引擎,主观上比JspThymeleaf好用。
  • Quartz:工业级调度器。
<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <exclusions>
            <exclusion>
                <groupId>com.zaxxer</groupId>
                <artifactId>HikariCP-java7</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-freemarker</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/css/uikit.min.css"/>
<script src="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/js/uikit.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/js/uikit-icons.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
CREATE TABLE `schedule_task`
(
    `id`               BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主键',`creator`          VARCHAR(16)     NOT NULL DEFAULT 'admin' COMMENT '创建人',`editor`           VARCHAR(16)     NOT NULL DEFAULT 'admin' COMMENT '修改人',`create_time`      DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`edit_time`        DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`version`          BIGINT          NOT NULL DEFAULT 1 COMMENT '版本号',`deleted`          TINYINT         NOT NULL DEFAULT 0 COMMENT '软删除标识',`task_id`          VARCHAR(64)     NOT NULL COMMENT '任务标识',`task_class`       VARCHAR(256)    NOT NULL COMMENT '任务类',`task_type`        VARCHAR(16)     NOT NULL COMMENT '任务类型,CRON,SIMPLE',`task_group`       VARCHAR(32)     NOT NULL DEFAULT 'DEFAULT' COMMENT '任务分组',`task_expression`  VARCHAR(256)    NOT NULL COMMENT '任务表达式',`task_description` VARCHAR(256) COMMENT '任务描述',`task_status`      TINYINT         NOT NULL DEFAULT 0 COMMENT '任务状态',UNIQUE uniq_task_class_task_group (`task_class`,`task_group`),UNIQUE uniq_task_id (`task_id`)
) COMMENT '调度任务';

CREATE TABLE `schedule_task_parameter`
(
    `id`              BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主键',`task_id`         VARCHAR(64)     NOT NULL COMMENT '任务标识',`parameter_value` VARCHAR(1024)   NOT NULL COMMENT '参数值',UNIQUE uniq_task_id (`task_id`)
) COMMENT '调度任务参数';
TriggerKey -> [name,group]
JobKey -> [name,group]
JobKey,TriggerKey -> [jobClassName,${spring.application.name} || applicationName]
public interface Scheduler {
    ......省略无关的代码......
    // 添加调度任务 - 包括任务内容和触发器
    void scheduleJob(JobDetail jobDetail,Set<? extends Trigger> triggersForJob,boolean replace) throws SchedulerException;

    // 移除触发器
    boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException;
    
    // 移除任务内容
    boolean deleteJob(JobKey jobKey) throws SchedulerException;
    ......省略无关的代码......
}
templates
  - common/script.ftl 公共脚本
  - task-add.ftl  添加新任务页面
  - task-edit.ftl 编辑任务页面
  - task-list.ftl 任务列表

@Autowired
private Scheduler scheduler;

public void refreshScheduleTask(ScheduleTask task,Trigger oldTrigger,TriggerKey triggerKey,Trigger newTrigger) throws Exception {
    JobDataMap jobDataMap = prepareJobDataMap(task);
    JobDetail jobDetail =
            JobBuilder.newJob((Class<? extends Job>) Class.forName(task.getTaskClass()))
                    .withIdentity(task.getTaskClass(),task.getTaskGroup())
                    .usingJobData(jobDataMap)
                    .build();
    // 总是覆盖
    if (ScheduleTaskStatus.ONLINE == ScheduleTaskStatus.fromType(task.getTaskStatus())) {
        scheduler.scheduleJob(jobDetail,Collections.singleton(newTrigger),Boolean.TRUE);
    } else {
        if (null != oldTrigger) {
            scheduler.unscheduleJob(triggerKey);
        }
    }
}

private JobDataMap prepareJobDataMap(ScheduleTask task) {
    JobDataMap jobDataMap = new JobDataMap();
    jobDataMap.put("scheduleTask",JsonUtils.X.format(task));
    ScheduleTaskParameter taskParameter = scheduleTaskParameterDao.selectByTaskId(task.getTaskId());
    if (null != taskParameter) {
        Map<String,Object> parameterMap = JsonUtils.X.parse(taskParameter.getParameterValue(),new TypeReference<Map<String,Object>>() {
                });
        jobDataMap.putAll(parameterMap);
    }
    return jobDataMap;
}
@DisallowConcurrentExecution
public abstract class AbstractScheduleTask implements Job {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired(required = false)
    private List<ScheduleTaskExecutionPostProcessor> processors;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        String scheduleTask = context.getMergedJobDataMap().getString("scheduleTask");
        ScheduleTask task = JsonUtils.X.parse(scheduleTask,ScheduleTask.class);
        ScheduleTaskInfo info = ScheduleTaskInfo.builder()
                .taskId(task.getTaskId())
                .taskClass(task.getTaskClass())
                .taskDescription(task.getTaskDescription())
                .taskExpression(task.getTaskExpression())
                .taskGroup(task.getTaskGroup())
                .taskType(task.getTaskType())
                .build();
        long start = System.currentTimeMillis();
        info.setStart(start);
        // 在MDC中添加traceId便于追踪调用链
        MappedDiagnosticContextAssistant.X.processInMappedDiagnosticContext(() -> {
            try {
                if (enableLogging()) {
                    logger.info("任务[{}]-[{}]-[{}]开始执行......",task.getTaskId(),task.getTaskClass(),task.getTaskDescription());
                }
                // 执行前的处理器回调
                processBeforeTaskExecution(info);
                // 子类实现的任务执行逻辑
                executeInternal(context);
                // 执行成功的处理器回调
                processAfterTaskExecution(info,ScheduleTaskExecutionStatus.SUCCESS);
            } catch (Exception e) {
                info.setThrowable(e);
                if (enableLogging()) {
                    logger.info("任务[{}]-[{}]-[{}]执行异常",task.getTaskDescription(),e);
                }
                // 执行异常的处理器回调
                processAfterTaskExecution(info,ScheduleTaskExecutionStatus.FAIL);
            } finally {
                long end = System.currentTimeMillis();
                long cost = end - start;
                info.setEnd(end);
                info.setCost(cost);
                if (enableLogging() && null == info.getThrowable()) {
                    logger.info("任务[{}]-[{}]-[{}]执行完毕,耗时:{} ms......",cost);
                }
                // 执行结束的处理器回调
                processAfterTaskCompletion(info);
            }
        });
    }

    protected boolean enableLogging() {
        return true;
    }

    /**
     * 内部执行方法 - 子类实现
     *
     * @param context context
     */
    protected abstract void executeInternal(JobExecutionContext context);

    /**
     * 拷贝任务信息
     */
    private ScheduleTaskInfo copyScheduleTaskInfo(ScheduleTaskInfo info) {
        return ScheduleTaskInfo.builder()
                .cost(info.getCost())
                .start(info.getStart())
                .end(info.getEnd())
                .throwable(info.getThrowable())
                .taskId(info.getTaskId())
                .taskClass(info.getTaskClass())
                .taskDescription(info.getTaskDescription())
                .taskExpression(info.getTaskExpression())
                .taskGroup(info.getTaskGroup())
                .taskType(info.getTaskType())
                .build();
    }
    
    // 任务执行之前回调
    void processBeforeTaskExecution(ScheduleTaskInfo info) {
        if (null != processors) {
            for (ScheduleTaskExecutionPostProcessor processor : processors) {
                processor.beforeTaskExecution(copyScheduleTaskInfo(info));
            }
        }
    }
    
    // 任务执行完毕时回调
    void processAfterTaskExecution(ScheduleTaskInfo info,ScheduleTaskExecutionStatus status) {
        if (null != processors) {
            for (ScheduleTaskExecutionPostProcessor processor : processors) {
                processor.afterTaskExecution(copyScheduleTaskInfo(info),status);
            }
        }
    }
    
    // 任务完结时回调
    void processAfterTaskCompletion(ScheduleTaskInfo info) {
        if (null != processors) {
            for (ScheduleTaskExecutionPostProcessor processor : processors) {
                processor.afterTaskCompletion(copyScheduleTaskInfo(info));
            }
        }
    }
}
public interface ScheduleTaskExecutionPostProcessor {
    
    default void beforeTaskExecution(ScheduleTaskInfo info) {

    }

    default void afterTaskExecution(ScheduleTaskInfo info,ScheduleTaskExecutionStatus status) {

    }

    default void afterTaskCompletion(ScheduleTaskInfo info) {

    }
}
public interface AlarmStrategy {

    void process(ScheduleTaskInfo scheduleTaskInfo);
}

// 默认启用的实现是无预警策略
public class NoneAlarmStrategy implements AlarmStrategy {

    @Override
    public void process(ScheduleTaskInfo scheduleTaskInfo) {

    }
}
@Slf4j
@Component
public class LoggingAlarmStrategy implements AlarmStrategy {

    @Override
    public void process(ScheduleTaskInfo scheduleTaskInfo) {
        if (null != scheduleTaskInfo.getThrowable()) {
            log.error("任务执行异常,任务内容:{}",JsonUtils.X.format(scheduleTaskInfo),scheduleTaskInfo.getThrowable());
        }
    }
}
quartz-web-ui-kit
  - quartz-web-ui-kit-core 核心包
  - h2-example H2数据库的演示例子
  - mysql-5.x-example MySQL5.x版本的演示例子
  - mysql-8.x-example MySQL8.x版本的演示例子
git clone https://github.com/zjcscut/quartz-web-ui-kit
cd quartz-web-ui-kit
mvn clean compile install
<dependency>
    <groupId>club.throwable</groupId>
    <artifactId>quartz-web-ui-kit-core</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>
<!-- 这个是必须,MySQL的驱动包 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
</dependency>
@Configuration
public class QuartzWebUiKitConfiguration implements EnvironmentAware {

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    @Bean
    public QuartzWebUiKitPropertiesProvider quartzWebUiKitPropertiesProvider() {
        return () -> {
            QuartzWebUiKitProperties properties = new QuartzWebUiKitProperties();
            properties.setDriverClassName(environment.getProperty("spring.datasource.driver-class-name"));
            properties.setUrl(environment.getProperty("spring.datasource.url"));
            properties.setUsername(environment.getProperty("spring.datasource.username"));
            properties.setPassword(environment.getProperty("spring.datasource.password"));
            return properties;
        };
    }
}
scripts
  - quartz-h2.sql
  - quartz-web-ui-kit-h2-ddl.sql
  - quartz-mysql-innodb.sql
  - quartz-web-ui-kit-mysql-ddl.sql
spring.application.name=mysql-5.x-example
server.port=8082
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# 这个local是本地提前建好的数据库
spring.datasource.url=jdbc:mysql://localhost:3306/local?characterEncoding=utf8&useUnicode=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# freemarker配置
spring.freemarker.template-loader-path=classpath:/templates/
spring.freemarker.cache=false
spring.freemarker.charset=UTF-8
spring.freemarker.check-template-location=true
spring.freemarker.content-type=text/html
spring.freemarker.expose-request-attributes=true
spring.freemarker.expose-session-attributes=true
spring.freemarker.request-context-attribute=request
spring.freemarker.suffix=.ftl
@Slf4j
public class CronTask extends AbstractScheduleTask {

    @Override
    protected void executeInternal(JobExecutionContext context) {
        logger.info("CronTask触发,TriggerKey:{}",context.getTrigger().getKey().toString());
    }
}
  • CRON表达式:格式是cron=你的CRON表达式,如cron=*/20 * * * * ?
  • 简单的周期性执行表达式:格式是intervalInMilliseconds=毫秒值,如intervalInMilliseconds=10000,表示10000毫秒执行一次。
  • repeatCount:表示简单的周期性执行任务的重复次数,默认为Integer.MAX_VALUE
  • startAt:任务首次执行的时间戳。
cron=*/20 * * * * ?

intervalInMilliseconds=10000

intervalInMilliseconds=10000,repeatCount=10
{"key":"value"}
@Slf4j
public class SimpleTask extends AbstractScheduleTask {

    @Override
    protected void executeInternal(JobExecutionContext context) {
        JobDataMap jobDataMap = context.getMergedJobDataMap();
        String value = jobDataMap.getString("key");
    }
}
  1. AbstractScheduleTask使用了@DisallowConcurrentExecution注解,任务会禁用并发执行,也就是多节点的情况下,只会有一个服务节点在同一轮触发时间下进行任务调度。
  2. CRON类型的任务被禁用了Misfire策略,也就是CRON类型的任务如果错失了触发时机不会有任何操作(这一点可以了解一下QuartzMisfire策略)。
原创声明
本站部分文章基于互联网的整理,我们会把真正“有用/优质”的文章整理提供给各位开发者。本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
本文链接:https://www.coolc.net/news/show_20930.html

本站采用系统自动发货方式,付款后即出现下载入口,如有疑问请咨询在线客服!

售后时间:早10点 - 晚11:30点

咨询售后客服