基於quartz完成動態定時任務
阿新 • • 發佈:2020-10-22
本文的主要需求動態配置定時任務,支援對於定時任務的增加、刪除
1. 引入maven依賴
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <exclusions> <exclusion> <groupId>com.mchange</groupId> <artifactId>c3p0</artifactId> </exclusion> </exclusions> <version>2.3.0</version> </dependency>
2.任務配置類
package com.datareach.kafka.schedule; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Role; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @Configuration @EnableScheduling //完全內部使用 @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class TaskConfiguration { //保證ConcurrentTaskScheduler不使用預設單執行緒的ScheduledExecutor,而是corePoolSize=5的執行緒池 @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledExecutorService scheduledAnnotationProcessor() { return Executors.newScheduledThreadPool(5, new DefaultThreadFactory()); } private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-schedule-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } }
3.動態定時任務
package com.datareach.kafka.schedule; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.assertj.core.util.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronSequenceGenerator; import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.util.CollectionUtils; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author: chaishuai * https://www.cnblogs.com/hujunzheng/p/10353390.html#autoid-3-0-0 */ @Configuration public class DynamicTask implements SchedulingConfigurer { private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class); private static final ExecutorService es = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), new DynamicTaskConsumeThreadFactory()); private volatile ScheduledTaskRegistrar registrar; private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>(); private volatile List<TaskConstant> taskConstants = Lists.newArrayList(); @Override public void configureTasks(ScheduledTaskRegistrar registrar) { this.registrar = registrar; this.registrar.addTriggerTask(() -> { if (!CollectionUtils.isEmpty(taskConstants)) { LOGGER.info("檢測動態定時任務列表..."); List<TimingTask> tts = new ArrayList<>(); taskConstants .forEach(taskConstant -> { TimingTask tt = new TimingTask(); tt.setExpression(taskConstant.getCron()); tt.setTaskId("dynamic-task-" + taskConstant.getTaskId()); tts.add(tt); }); this.refreshTasks(tts); } } , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext)); } public List<TaskConstant> getTaskConstants() { return taskConstants; } private void refreshTasks(List<TimingTask> tasks) { //取消已經刪除的策略任務 Set<String> taskIds = scheduledFutures.keySet(); for (String taskId : taskIds) { if (!exists(tasks, taskId)) { scheduledFutures.get(taskId).cancel(false); } } for (TimingTask tt : tasks) { String expression = tt.getExpression(); if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) { LOGGER.error("定時任務DynamicTask cron表示式不合法: " + expression); continue; } //如果配置一致,則不需要重新建立定時任務 if (scheduledFutures.containsKey(tt.getTaskId()) && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) { continue; } //如果策略執行時間發生了變化,則取消當前策略的任務 if (scheduledFutures.containsKey(tt.getTaskId())) { scheduledFutures.remove(tt.getTaskId()).cancel(false); cronTasks.remove(tt.getTaskId()); } CronTask task = new CronTask(tt, expression); ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger()); cronTasks.put(tt.getTaskId(), task); scheduledFutures.put(tt.getTaskId(), future); } } private boolean exists(List<TimingTask> tasks, String taskId) { for (TimingTask task : tasks) { if (task.getTaskId().equals(taskId)) { return true; } } return false; } @PreDestroy public void destroy() { this.registrar.destroy(); } public static class TaskConstant { private String cron; private String taskId; public String getCron() { return cron; } public void setCron(String cron) { this.cron = cron; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } } private class TimingTask implements Runnable { private String expression; private String taskId; public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public void run() { //設定佇列大小10 LOGGER.error("當前CronTask: " + this); DynamicBlockingQueue queue = new DynamicBlockingQueue(3); es.submit(() -> { while (!queue.isDone() || !queue.isEmpty()) { try { String content = queue.poll(500, TimeUnit.MILLISECONDS); if (StringUtils.isBlank(content)) { return; } //呼叫例項方法 //JobInvokeUtil.invokeMethod("producerService.producePost()"); LOGGER.info("DynamicBlockingQueue 消費:" + content); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { LOGGER.error("", e); } } }); //佇列放入資料 for (int i = 0; i < 1; ++i) { try { queue.put(String.valueOf(i)); LOGGER.info("DynamicBlockingQueue 生產:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } queue.setDone(true); } public String getExpression() { return expression; } public void setExpression(String expression) { this.expression = expression; } @Override public String toString() { return ReflectionToStringBuilder.toString(this , ToStringStyle.SIMPLE_STYLE , false , false , TimingTask.class); } } /** * 佇列消費執行緒工廠類 */ private static class DynamicTaskConsumeThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DynamicTaskConsumeThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-dynamic-task-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> { DynamicBlockingQueue(int capacity) { super(capacity); } private volatile boolean done = false; public boolean isDone() { return done; } public void setDone(boolean done) { this.done = done; } } }
4. 測試類
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = KafkaConsumerApplication.class)
public class DynamicTaskTest {
@Autowired
private DynamicTask dynamicTask;
@Test
public void test() throws InterruptedException {
List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
/*DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
taskConstant.setCron("0/5 * * * * ?");
taskConstant.setTaskId("test1");
taskConstans.add(taskConstant);
DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
taskConstant1.setCron("0/5 * * * * ?");
taskConstant1.setTaskId("test2");
taskConstans.add(taskConstant1);*/
DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
taskConstant2.setCron("0/10 * * * * ?");
taskConstant2.setTaskId("test3");
taskConstans.add(taskConstant2);
TimeUnit.SECONDS.sleep(40);
//移除並新增新的配置
/* taskConstans.remove(taskConstans.size() - 1);
DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
taskConstant3.setCron("0/5 * * * * ?");
taskConstant3.setTaskId("test4");
taskConstans.add(taskConstant3);*/
//
TimeUnit.MINUTES.sleep(50);
}
}
文章參考自:https://www.cnblogs.com/hujunzheng/p/10353390.html#autoid-3-0-0