1. 程式人生 > 實用技巧 >基於quartz完成動態定時任務

基於quartz完成動態定時任務

本文的主要需求動態配置定時任務,支援對於定時任務的增加、刪除

1. 引入maven依賴

<dependency>
	<groupId>org.quartz-scheduler</groupId>
	<artifactId>quartz</artifactId>
	<exclusions>
		<exclusion>
			<groupId>com.mchange</groupId>
			<artifactId>c3p0</artifactId>
		</exclusion>
	</exclusions>
	<version>2.3.0</version>
</dependency>

2.任務配置類

package com.datareach.kafka.schedule;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@EnableScheduling
//完全內部使用
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {

    //保證ConcurrentTaskScheduler不使用預設單執行緒的ScheduledExecutor,而是corePoolSize=5的執行緒池
    @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledExecutorService scheduledAnnotationProcessor() {
        return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-schedule-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

3.動態定時任務

package com.datareach.kafka.schedule;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.CollectionUtils;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author: chaishuai
 * https://www.cnblogs.com/hujunzheng/p/10353390.html#autoid-3-0-0
 */
@Configuration
public class DynamicTask implements SchedulingConfigurer {
    private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class);

    private static final ExecutorService es = new ThreadPoolExecutor(10, 20,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(10),
            new DynamicTaskConsumeThreadFactory());


    private volatile ScheduledTaskRegistrar registrar;
    private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();

    private volatile List<TaskConstant> taskConstants = Lists.newArrayList();

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {
        this.registrar = registrar;
        this.registrar.addTriggerTask(() -> {
                    if (!CollectionUtils.isEmpty(taskConstants)) {
                        LOGGER.info("檢測動態定時任務列表...");
                        List<TimingTask> tts = new ArrayList<>();
                        taskConstants
                                .forEach(taskConstant -> {
                                    TimingTask tt = new TimingTask();
                                    tt.setExpression(taskConstant.getCron());
                                    tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
                                    tts.add(tt);
                                });
                        this.refreshTasks(tts);
                    }
                }
                , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
    }
    public List<TaskConstant> getTaskConstants() {
        return taskConstants;
    }
    private void refreshTasks(List<TimingTask> tasks) {
        //取消已經刪除的策略任務
        Set<String> taskIds = scheduledFutures.keySet();
        for (String taskId : taskIds) {
            if (!exists(tasks, taskId)) {
                scheduledFutures.get(taskId).cancel(false);
            }
        }
        for (TimingTask tt : tasks) {
            String expression = tt.getExpression();
            if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
                LOGGER.error("定時任務DynamicTask cron表示式不合法: " + expression);
                continue;
            }
            //如果配置一致,則不需要重新建立定時任務
            if (scheduledFutures.containsKey(tt.getTaskId())
                    && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
                continue;
            }
            //如果策略執行時間發生了變化,則取消當前策略的任務
            if (scheduledFutures.containsKey(tt.getTaskId())) {
                scheduledFutures.remove(tt.getTaskId()).cancel(false);
                cronTasks.remove(tt.getTaskId());
            }
            CronTask task = new CronTask(tt, expression);
            ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
            cronTasks.put(tt.getTaskId(), task);
            scheduledFutures.put(tt.getTaskId(), future);
        }
    }
    private boolean exists(List<TimingTask> tasks, String taskId) {
        for (TimingTask task : tasks) {
            if (task.getTaskId().equals(taskId)) {
                return true;
            }
        }
        return false;
    }
    @PreDestroy
    public void destroy() {
        this.registrar.destroy();
    }
    public static class TaskConstant {
        private String cron;
        private String taskId;
        public String getCron() {
            return cron;
        }
        public void setCron(String cron) {
            this.cron = cron;
        }
        public String getTaskId() {
            return taskId;
        }
        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
    }
    private class TimingTask implements Runnable {
        private String expression;
        private String taskId;
        public String getTaskId() {
            return taskId;
        }
        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
        @Override
        public void run() {
            //設定佇列大小10
            LOGGER.error("當前CronTask: " + this);
            DynamicBlockingQueue queue = new DynamicBlockingQueue(3);
            es.submit(() -> {
                while (!queue.isDone() || !queue.isEmpty()) {
                    try {
                        String content = queue.poll(500, TimeUnit.MILLISECONDS);
                        if (StringUtils.isBlank(content)) {
                            return;
                        }
                        //呼叫例項方法
                        //JobInvokeUtil.invokeMethod("producerService.producePost()");
                        LOGGER.info("DynamicBlockingQueue 消費:" + content);
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        LOGGER.error("", e);
                    }
                }
            });

            //佇列放入資料
            for (int i = 0; i < 1; ++i) {
                try {
                    queue.put(String.valueOf(i));
                    LOGGER.info("DynamicBlockingQueue 生產:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.setDone(true);
        }
        public String getExpression() {
            return expression;
        }
        public void setExpression(String expression) {
            this.expression = expression;
        }
        @Override
        public String toString() {
            return ReflectionToStringBuilder.toString(this
                    , ToStringStyle.SIMPLE_STYLE
                    , false
                    , false
                    , TimingTask.class);
        }
    }
    /**
     * 佇列消費執行緒工廠類
     */
    private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DynamicTaskConsumeThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-dynamic-task-";
        }
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
    private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {
        DynamicBlockingQueue(int capacity) {
            super(capacity);
        }
        private volatile boolean done = false;

        public boolean isDone() {
            return done;
        }
        public void setDone(boolean done) {
            this.done = done;
        }
    }
}

4. 測試類

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = KafkaConsumerApplication.class)
public class DynamicTaskTest {
    @Autowired
    private DynamicTask dynamicTask;
    @Test
    public void test() throws InterruptedException {
        List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
        /*DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
        taskConstant.setCron("0/5 * * * * ?");
        taskConstant.setTaskId("test1");
        taskConstans.add(taskConstant);
        
        DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
        taskConstant1.setCron("0/5 * * * * ?");
        taskConstant1.setTaskId("test2");
        taskConstans.add(taskConstant1);*/
        DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
        taskConstant2.setCron("0/10 * * * * ?");
        taskConstant2.setTaskId("test3");
        taskConstans.add(taskConstant2);
        TimeUnit.SECONDS.sleep(40);
        //移除並新增新的配置
       /* taskConstans.remove(taskConstans.size() - 1);
        DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
        taskConstant3.setCron("0/5 * * * * ?");
        taskConstant3.setTaskId("test4");
        taskConstans.add(taskConstant3);*/
//
        TimeUnit.MINUTES.sleep(50);
    }
}

文章參考自:https://www.cnblogs.com/hujunzheng/p/10353390.html#autoid-3-0-0