基於Scheduler自定義任務排程
阿新 • • 發佈:2020-12-10
自定義基於ThreadPoolTaskScheduler編寫了一個支援啟動、停止、重啟的簡單任務排程器原型。
- 定義ThreadPoolTaskScheduler配置
package com.cloud.test.schedule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 排程器的配置
*
* @author zhe.xiao
* @date 2020-12-04
* @description
*/
@Configuration
public class MySchedulerConfig {
@Bean(name = "myScheduler")
public ThreadPoolTaskScheduler myScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
//池大小
taskScheduler.setPoolSize(20);
//名字
taskScheduler.setThreadNamePrefix("zhexiao-schedule");
//是否等待執行任務執行完畢後關閉
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//在關閉的時候等待其他task完成的最長等待時間
taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
}
- 定義任務實體entity
package com.cloud.test.schedule;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* 任務排程的物件
*
* @author zhe.xiao
* @date 2020-12-07
* @description
*/
@Data
@Accessors(chain = true)
public class ScheduleEntity {
private String taskKey;
private String taskCron;
}
- 定義任務的Runnable,方便後面擴充套件
package com.cloud.test.schedule;
/**
*
* 自定義執行緒的 Runnable
*
* @author zhe.xiao
* @date 2020-12-07
* @description
*/
public interface ScheduledTaskRunnable extends Runnable {
}
- 編寫核心的service
package com.cloud.test.schedule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author zhe.xiao
* @date 2020-12-07
* @description
*/
@Service
public class ScheduledTaskService {
private static final Logger log = LoggerFactory.getLogger(ScheduledTaskService.class);
@Autowired
@Qualifier("myScheduler")
ThreadPoolTaskScheduler myScheduler;
//儲存需要被執行的task
private static HashMap<String, ScheduleEntity> tasks = new HashMap<>();
//記錄正在執行的task
private static ConcurrentHashMap<String, ScheduledFuture> taskMap = new ConcurrentHashMap<>();
//加鎖防止啟動多次相同任務
private ReentrantLock lock = new ReentrantLock();
static {
ScheduleEntity task1 = new ScheduleEntity();
task1.setTaskKey("task1").setTaskCron("0/3 * * * * ?");
ScheduleEntity task2 = new ScheduleEntity();
task2.setTaskKey("task2").setTaskCron("0/10 * * * * ?");
tasks.put(task1.getTaskKey(), task1);
tasks.put(task2.getTaskKey(), task2);
}
/**
* 啟動任務
* <p>
* 加鎖防止多執行緒的時候啟動多次
*/
public void start() {
synchronized (ScheduledTaskService.class) {
for (ScheduleEntity task : tasks.values()) {
this.startTask(task);
}
}
}
/**
* 停止任務
*
* @param taskKey
*/
public void stop(String taskKey) {
ScheduledFuture future = taskMap.get(taskKey);
log.info("future={}, taskKey={}", future, taskKey);
if (null != future) {
future.cancel(true);
taskMap.remove(taskKey);
log.info("任務停止 taskKey={}, taskMap={}", taskKey, taskMap);
}
}
/**
* 重啟服務
*
* @param taskKey
*/
public void restart(String taskKey) {
this.stop(taskKey);
ScheduleEntity entity = tasks.get(taskKey);
if (null != entity) {
this.startTask(entity);
log.info("任務重啟 taskKey={}, taskMap={}", taskKey, taskMap);
}
}
/**
* 通過cron的方式 啟動單個的任務
*
* @param scheduleEntity
*/
private void startTask(ScheduleEntity scheduleEntity) {
ScheduledFuture<?> future = myScheduler.schedule(new ScheduledTaskRunnable() {
@Override
public void run() {
log.info("{} run at={}:執行緒={}", scheduleEntity.getTaskKey(), LocalDateTime.now(), Thread.currentThread().getName());
log.info("===========================================================================");
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
CronTrigger cronTrigger = new CronTrigger(scheduleEntity.getTaskCron());
return cronTrigger.nextExecutionTime(triggerContext);
}
});
taskMap.put(scheduleEntity.getTaskKey(), future);
log.info("任務啟動 taskKey={}, taskMap={}", scheduleEntity.getTaskKey(), taskMap);
}
}
- 編寫controller控制
package com.cloud.test.controller;
import com.cloud.test.schedule.ScheduledTaskService;
import com.cloud.test.stream.MyChannelSend;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author zhe.xiao
* @date 2020-12-01
* @description
*/
@RestController
@RequestMapping(path = "/schedule")
public class ScheduleController {
@Autowired
ScheduledTaskService scheduledTaskService;
@GetMapping(path = "/start")
public void start() {
scheduledTaskService.start();
}
@GetMapping(path = "/stop")
public void stop(@RequestParam(name = "taskKey") String taskKey) {
scheduledTaskService.stop(taskKey);
}
@GetMapping(path = "/restart")
public void restart(@RequestParam(name = "taskKey") String taskKey) {
scheduledTaskService.restart(taskKey);
}
}
測試:
啟動:http://localhost:9901/schedule/start
停止:http://localhost:9901/schedule/stop?taskKey=task1
重啟:http://localhost:9901/schedule/restart?taskKey=task1
- 實現專案啟動時自啟動
SpringBoot給我們提供了兩個介面來幫助我們實現這種需求。這兩個介面分別為CommandLineRunner和ApplicationRunner。他們的執行時機為容器啟動完成的時候。
package com.cloud.test.schedule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
*
* order:Lower values have higher priority
*
* @author zhe.xiao
* @date 2020-12-08
* @description
*/
@Component
@Order(value = 1)
public class MySchedulerAutoStart implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(MySchedulerAutoStart.class);
@Override
public void run(ApplicationArguments args) throws Exception {
//這裡放入需要初始化就啟動的任務
log.info("auto start...............................");
}
}