1. 程式人生 > 其它 >基於Scheduler自定義任務排程

基於Scheduler自定義任務排程

技術標籤:Javaspring

自定義基於ThreadPoolTaskScheduler編寫了一個支援啟動、停止、重啟的簡單任務排程器原型。

  1. 定義ThreadPoolTaskScheduler配置
package com.cloud.test.schedule;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/** * 排程器的配置 * * @author zhe.xiao * @date 2020-12-04 * @description */ @Configuration public class MySchedulerConfig { @Bean(name = "myScheduler") public ThreadPoolTaskScheduler myScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); //池大小
taskScheduler.setPoolSize(20); //名字 taskScheduler.setThreadNamePrefix("zhexiao-schedule"); //是否等待執行任務執行完畢後關閉 taskScheduler.setWaitForTasksToCompleteOnShutdown(true); //在關閉的時候等待其他task完成的最長等待時間 taskScheduler.setAwaitTerminationSeconds(60); return
taskScheduler; } }
  1. 定義任務實體entity
package com.cloud.test.schedule;

import lombok.Data;
import lombok.experimental.Accessors;

/**
 * 任務排程的物件
 *
 * @author zhe.xiao
 * @date 2020-12-07
 * @description
 */
@Data
@Accessors(chain = true)
public class ScheduleEntity {
    private String taskKey;

    private String taskCron;
}
  1. 定義任務的Runnable,方便後面擴充套件
package com.cloud.test.schedule;

/**
 *
 * 自定義執行緒的 Runnable
 *
 * @author zhe.xiao
 * @date 2020-12-07
 * @description
 */
public interface ScheduledTaskRunnable extends Runnable {
}
  1. 編寫核心的service
package com.cloud.test.schedule;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author zhe.xiao
 * @date 2020-12-07
 * @description
 */
@Service
public class ScheduledTaskService {
    private static final Logger log = LoggerFactory.getLogger(ScheduledTaskService.class);

    @Autowired
    @Qualifier("myScheduler")
    ThreadPoolTaskScheduler myScheduler;

    //儲存需要被執行的task
    private static HashMap<String, ScheduleEntity> tasks = new HashMap<>();

    //記錄正在執行的task
    private static ConcurrentHashMap<String, ScheduledFuture> taskMap = new ConcurrentHashMap<>();

    //加鎖防止啟動多次相同任務
    private ReentrantLock lock = new ReentrantLock();

    static {
        ScheduleEntity task1 = new ScheduleEntity();
        task1.setTaskKey("task1").setTaskCron("0/3 * * * * ?");

        ScheduleEntity task2 = new ScheduleEntity();
        task2.setTaskKey("task2").setTaskCron("0/10 * * * * ?");

        tasks.put(task1.getTaskKey(), task1);
        tasks.put(task2.getTaskKey(), task2);
    }

    /**
     * 啟動任務
     * <p>
     * 加鎖防止多執行緒的時候啟動多次
     */
    public void start() {
        synchronized (ScheduledTaskService.class) {
            for (ScheduleEntity task : tasks.values()) {
                this.startTask(task);
            }
        }
    }

    /**
     * 停止任務
     *
     * @param taskKey
     */
    public void stop(String taskKey) {
        ScheduledFuture future = taskMap.get(taskKey);
        log.info("future={}, taskKey={}", future, taskKey);

        if (null != future) {
            future.cancel(true);
            taskMap.remove(taskKey);

            log.info("任務停止 taskKey={}, taskMap={}", taskKey, taskMap);

        }
    }

    /**
     * 重啟服務
     *
     * @param taskKey
     */
    public void restart(String taskKey) {
        this.stop(taskKey);

        ScheduleEntity entity = tasks.get(taskKey);
        if (null != entity) {
            this.startTask(entity);
            log.info("任務重啟 taskKey={}, taskMap={}", taskKey, taskMap);
        }
    }

    /**
     * 通過cron的方式 啟動單個的任務
     *
     * @param scheduleEntity
     */
    private void startTask(ScheduleEntity scheduleEntity) {
        ScheduledFuture<?> future = myScheduler.schedule(new ScheduledTaskRunnable() {
            @Override
            public void run() {
                log.info("{} run at={}:執行緒={}", scheduleEntity.getTaskKey(), LocalDateTime.now(), Thread.currentThread().getName());
                log.info("===========================================================================");
            }
        }, new Trigger() {
            @Override
            public Date nextExecutionTime(TriggerContext triggerContext) {
                CronTrigger cronTrigger = new CronTrigger(scheduleEntity.getTaskCron());
                return cronTrigger.nextExecutionTime(triggerContext);
            }
        });

        taskMap.put(scheduleEntity.getTaskKey(), future);
        log.info("任務啟動 taskKey={}, taskMap={}", scheduleEntity.getTaskKey(), taskMap);
    }
}
  1. 編寫controller控制
package com.cloud.test.controller;

import com.cloud.test.schedule.ScheduledTaskService;
import com.cloud.test.stream.MyChannelSend;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author zhe.xiao
 * @date 2020-12-01
 * @description
 */
@RestController
@RequestMapping(path = "/schedule")
public class ScheduleController {
    @Autowired
    ScheduledTaskService scheduledTaskService;

    @GetMapping(path = "/start")
    public void start() {
        scheduledTaskService.start();
    }

    @GetMapping(path = "/stop")
    public void stop(@RequestParam(name = "taskKey") String taskKey) {
        scheduledTaskService.stop(taskKey);
    }

    @GetMapping(path = "/restart")
    public void restart(@RequestParam(name = "taskKey") String taskKey) {
        scheduledTaskService.restart(taskKey);
    }
}

測試:

啟動:http://localhost:9901/schedule/start

image-20201208113940051

停止:http://localhost:9901/schedule/stop?taskKey=task1

image-20201208114005238

重啟:http://localhost:9901/schedule/restart?taskKey=task1

image-20201208114022078

  1. 實現專案啟動時自啟動

SpringBoot給我們提供了兩個介面來幫助我們實現這種需求。這兩個介面分別為CommandLineRunner和ApplicationRunner。他們的執行時機為容器啟動完成的時候。

package com.cloud.test.schedule;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 *
 * order:Lower values have higher priority
 *
 * @author zhe.xiao
 * @date 2020-12-08
 * @description
 */
@Component
@Order(value = 1)
public class MySchedulerAutoStart implements ApplicationRunner {
    private static final Logger log = LoggerFactory.getLogger(MySchedulerAutoStart.class);

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //這裡放入需要初始化就啟動的任務
        log.info("auto start...............................");
    }
}