1. 程式人生 > 其它 >利用Spring的ThreadPoolTaskScheduler實現輕量級任務排程

利用Spring的ThreadPoolTaskScheduler實現輕量級任務排程

ThreadPoolTaskScheduler實現輕量級任務排程。適用單體應用。

在單體應用中需要一個使用簡單效能可靠的排程功能,要求可以通過Cron表示式配置觸發時間並且任務執行時間可以修改並且立即生效,可以在執行時動態增加、停止、重啟job等。

經過研究org.springframework.scheduling.annotation.SchedulingConfigurer滿足通過Cron表示式配置觸發時間、任務執行時間可以修改但不滿足修改cron表示式後立即生效,也不支援執行時動態增加、停止、重啟job。

進一步研究發現可以使用org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler滿足全部需要。示例程式碼如下:

初始化ThreadPoolTaskScheduler:

@Configuration
public class AppConfig {
    @Bean
    @ConditionalOnBean
    public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("job-schedule-");
        //	 *Set whether to wait for scheduled tasks to complete on shutdown,
        //	  not interrupting running tasks and executing all tasks in the queue.
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        /**
         *  Set the maximum number of seconds that this executor is supposed to block
         *  on shutdown in order to wait for remaining tasks to complete their execution
         *  before the rest of the container continues to shut down. This is particularly
         *  useful if your remaining tasks are likely to need access to other resources
         *  that are also managed by the container.
         */
        taskScheduler.setAwaitTerminationSeconds(60);
        return taskScheduler;
    }
}

業務物件SysJobSchedule:

public class SysJobSchedule {
    private String name;
    private String code;
    private String comment;
    private String cron;
    private String className;
    private Boolean deleteFlag = false;
    private Boolean stopFlag = false;
    .....
 }

核心SmartScheduleJob

@Service
@Slf4j
@EnableScheduling
public class SmartScheduleJob {
    private ConcurrentHashMap<String, ScheduledFuture> futureConcurrentHashMap = new ConcurrentHashMap<>();
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    
      /**
     * 啟動排程
     * @param jobSchedule
     */
    public void startSchedule(SysJobSchedule jobSchedule) {
        if (jobSchedule != null) {
            log.info("啟動Job {}",jobSchedule.getClassName());
            stopSchedule(jobSchedule.getClassName());
            initSchedule(jobSchedule);
        }
    }

    /**
     * 關閉排程
     * @param className
     */
    public void stopSchedule((SysJobSchedule jobSchedule) {
        String className = jobSchedule.getgClassName();
        ScheduledFuture scheduledFuture = this.futureConcurrentHashMap.get(className);
        if (scheduledFuture != null) {
            log.info("==關閉Job:{}", className);
            futureConcurrentHashMap.remove(className);
            if (!scheduledFuture.isCancelled())
                scheduledFuture.cancel(true);
        }
    }


    private void initSchedule(SysJobSchedule jobSchedule) {
        if(jobSchedule.getStopFlag())
            return;
        ScheduledFuture future = this.threadPoolTaskScheduler.schedule(
                //1.新增任務內容(Runnable)
                () -> {
                    this.doTask(jobSchedule);
                },
                //2.設定執行週期(Trigger)
                triggerContext -> {                              
                    String cron = jobSchedule.getCron();                 
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
        //將ScheduledFuture新增到ConcurrentHashMap中
        futureConcurrentHashMap.put(jobSchedule.getClassName(), future);
    }

    private void doTask(SysJobSchedule jobSchedule) {
      doSomething。。。。
    }

}

ThreadPoolTaskScheduler為輕量級任務排程器適用於單體應用,不適合分散式叢集部署,由於節點之間沒有共享資訊,因而會出現多次排程的情況.