利用Spring的ThreadPoolTaskScheduler實現輕量級任務排程
阿新 • • 發佈:2022-03-04
ThreadPoolTaskScheduler實現輕量級任務排程。適用單體應用。
在單體應用中需要一個使用簡單效能可靠的排程功能,要求可以通過Cron表示式配置觸發時間並且任務執行時間可以修改並且立即生效,可以在執行時動態增加、停止、重啟job等。
經過研究org.springframework.scheduling.annotation.SchedulingConfigurer
滿足通過Cron表示式配置觸發時間、任務執行時間可以修改但不滿足修改cron表示式後立即生效,也不支援執行時動態增加、停止、重啟job。
進一步研究發現可以使用org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
滿足全部需要。示例程式碼如下:
初始化ThreadPoolTaskScheduler
:
@Configuration public class AppConfig { @Bean @ConditionalOnBean public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){ ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.setThreadNamePrefix("job-schedule-"); // *Set whether to wait for scheduled tasks to complete on shutdown, // not interrupting running tasks and executing all tasks in the queue. taskScheduler.setWaitForTasksToCompleteOnShutdown(true); /** * Set the maximum number of seconds that this executor is supposed to block * on shutdown in order to wait for remaining tasks to complete their execution * before the rest of the container continues to shut down. This is particularly * useful if your remaining tasks are likely to need access to other resources * that are also managed by the container. */ taskScheduler.setAwaitTerminationSeconds(60); return taskScheduler; } }
業務物件SysJobSchedule
:
public class SysJobSchedule {
private String name;
private String code;
private String comment;
private String cron;
private String className;
private Boolean deleteFlag = false;
private Boolean stopFlag = false;
.....
}
核心SmartScheduleJob
@Service @Slf4j @EnableScheduling public class SmartScheduleJob { private ConcurrentHashMap<String, ScheduledFuture> futureConcurrentHashMap = new ConcurrentHashMap<>(); @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; /** * 啟動排程 * @param jobSchedule */ public void startSchedule(SysJobSchedule jobSchedule) { if (jobSchedule != null) { log.info("啟動Job {}",jobSchedule.getClassName()); stopSchedule(jobSchedule.getClassName()); initSchedule(jobSchedule); } } /** * 關閉排程 * @param className */ public void stopSchedule((SysJobSchedule jobSchedule) { String className = jobSchedule.getgClassName(); ScheduledFuture scheduledFuture = this.futureConcurrentHashMap.get(className); if (scheduledFuture != null) { log.info("==關閉Job:{}", className); futureConcurrentHashMap.remove(className); if (!scheduledFuture.isCancelled()) scheduledFuture.cancel(true); } } private void initSchedule(SysJobSchedule jobSchedule) { if(jobSchedule.getStopFlag()) return; ScheduledFuture future = this.threadPoolTaskScheduler.schedule( //1.新增任務內容(Runnable) () -> { this.doTask(jobSchedule); }, //2.設定執行週期(Trigger) triggerContext -> { String cron = jobSchedule.getCron(); return new CronTrigger(cron).nextExecutionTime(triggerContext); } ); //將ScheduledFuture新增到ConcurrentHashMap中 futureConcurrentHashMap.put(jobSchedule.getClassName(), future); } private void doTask(SysJobSchedule jobSchedule) { doSomething。。。。 } }
ThreadPoolTaskScheduler
為輕量級任務排程器適用於單體應用,不適合分散式叢集部署,由於節點之間沒有共享資訊,因而會出現多次排程的情況.