1. 程式人生 > >springboot 整合quartz詳解

springboot 整合quartz詳解

本文主要是針對springboot 整合quartz 來說明,作為一個小白我總喜歡上網查詢資料,然後希望找的東西能80%符合自己的需求,能直接拿來用的,但是很多時候網上的案例都是一個copy一個的,看的東西千篇一律,而且有些內容解釋的也是寥寥數語,所以有時候也需要自己寫點東西分享給大眾,今天也是自己剛申請csdn ,取的筆名為:隨筆發燒友,勵志自己後面能多寫文章,樂在分享。好了,進入正題吧!

 

Quartz

quartz是一個java編寫的開源任務排程框架其主要排程元素有:

  • Trigger(觸發器):觸發任務任務執行的時間或規則。在任務排程Quartz中,Trigger主要的觸發器有:SimpleTrigger,CalendarIntervelTrigger,DailyTimeIntervalTrigger,CronTrigger
  • Scheduler(任務排程器):Scheduler就是任務排程控制器,需要把JobDetail和Trigger註冊到schedule中,才可以執行 ;Scheduler有兩個重要元件:ThreadPool和JobStore。
  • Job(任務):是一個介面,其中只有一個execute方法。開發者只要實現介面中的execute方法即可。
  • JobDetail(任務細節):Quartz執行Job時,需要新建Job例項,但不能直接操作Job類,所以通過JobDetail獲得Job的名稱,描述資訊。

先了解quartz四大要素,然後我們的目的就是組合四大要素,實現任務的建立,並啟用/暫停/移除/更新操作

1、編寫一個自己要實現的任務類,實現Job介面中的執行方法 execute

/**
 * 定時任務實現類
 */
@Configuration
@Component
@EnableScheduling
public class ScheduleTask1 implements Job {
 
    private static Logger logger = LoggerFactory.getLogger(ScheduleTask1.class);
 
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            System.out.print("任務執行1 :");
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
 
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

2、構建任務細節

@Service
public class TaskService {

    @SuppressWarnings("unused")
    private final Logger logger = LoggerFactory.getLogger(this.getClass()); 

    @Autowired     
    private SchedulerFactoryBean schedulerFactoryBean;   

    @Autowired
    private QuartzJobRepository repository;

    /**    
     * 獲取單個任務    
     * @param jobName    
     * @param jobGroup    
     * @return    
     * @throws SchedulerException    
     */    
    public QuartzJobBean getJob(String jobName,String jobGroup) throws SchedulerException {
        QuartzJobBean job = null;    
        Scheduler scheduler = getScheduler();
        TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);    
        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        if (null != trigger) {
            job = createJob(jobName, jobGroup, scheduler, trigger);
        }

        return job;
    }

    private Scheduler getScheduler() {
        return schedulerFactoryBean.getScheduler();
    }

    private QuartzJobBean createJob(String jobName, String jobGroup, Scheduler scheduler, Trigger trigger)
            throws SchedulerException {
        QuartzJobBean job;
        job = new QuartzJobBean();
        job.setJobName(jobName);    
        job.setJobGroup(jobGroup);    
        job.setDescription("觸發器:" + trigger.getKey()); 
        job.setNextTime(trigger.getNextFireTime());
        job.setPreviousTime(trigger.getPreviousFireTime());

        Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
        job.setJobStatus(triggerState.name());

        if(trigger instanceof CronTrigger) {
            CronTrigger cronTrigger = (CronTrigger)trigger;
            String cronExpression = cronTrigger.getCronExpression();
            job.setCronExpression(cronExpression);
        }
        return job;
    }

    /**    
     * 獲取所有任務    
     * @return    
     * @throws SchedulerException    
     */    
    public List<QuartzJobBean> getAllJobs() throws SchedulerException{   
        Scheduler scheduler = getScheduler();
        GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
        Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
        List<QuartzJobBean> jobList = new ArrayList<QuartzJobBean>();
        List<? extends Trigger> triggers;
        QuartzJobBean job;
        for (JobKey jobKey : jobKeys) {
            triggers = scheduler.getTriggersOfJob(jobKey);
            for (Trigger trigger : triggers) {
                job = createJob(jobKey.getName(), jobKey.getGroup(), scheduler, trigger);
                jobList.add(job);
            }
        }

        return jobList;
    }

    /**    
     * 所有正在執行的job    
     *     
     * @return    
     * @throws SchedulerException    
     */    
    public List<QuartzJobBean> getRunningJob() throws SchedulerException {
        Scheduler scheduler = getScheduler();
        List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
        List<QuartzJobBean> jobList = new ArrayList<QuartzJobBean>(executingJobs.size());
        QuartzJobBean job;
        JobDetail jobDetail;
        JobKey jobKey;

        for (JobExecutionContext executingJob : executingJobs) {
            jobDetail = executingJob.getJobDetail();
            jobKey = jobDetail.getKey();

            job = createJob(jobKey.getName(), jobKey.getGroup(), scheduler, executingJob.getTrigger());
            jobList.add(job);
        }

        return jobList;
    }

    /**    
     * 新增任務    
     *     
     * @param scheduleJob    
     * @throws SchedulerException    
     */    
    public boolean addJob(QuartzJobBean job) throws SchedulerException { 
        if(job == null || !QuartzJobBean.STATUS_RUNNING.equals(job.getJobStatus())) {
            return false;
        }

        String jobName = job.getJobName();
        String jobGroup = job.getJobGroup();
        if(!TaskUtils.isValidExpression(job.getCronExpression())) {
            logger.error("時間表達式錯誤("+jobName+","+jobGroup+"), "+job.getCronExpression());    
            return false;
        } else {
            Scheduler scheduler = getScheduler();
            // 任務名稱和任務組設定規則:    // 名稱:task_1 ..    // 組 :group_1 ..
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName,  jobGroup);
            Trigger trigger = scheduler.getTrigger(triggerKey);
            // 不存在,建立一個       
            if (null == trigger) { 
                // 表示式排程構建器
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
                // 按新的表示式構建一個新的trigger
                trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
                                                     .startAt(job.getStartTime()==null ? (new Date()) : job.getStartTime()) // 設定job不早於這個時間進行執行,和呼叫trigger的setStartTime方法效果一致
                                                     .withSchedule(scheduleBuilder).build();

                //是否允許併發執行
                JobDetail jobDetail = getJobDetail(job);
                // 將 job 資訊存入資料庫
                job.setStartTime(trigger.getStartTime());
                job.setNextTime(trigger.getNextFireTime());
                job.setPreviousTime(trigger.getPreviousFireTime());
                job = repository.save(job);
                jobDetail.getJobDataMap().put(getJobIdentity(job), job);

                scheduler.scheduleJob(jobDetail, trigger);

            } else { // trigger已存在,則更新相應的定時設定  
                // 更新 job 資訊到資料庫
                job.setStartTime(trigger.getStartTime());
                job.setNextTime(trigger.getNextFireTime());
                job.setPreviousTime(trigger.getPreviousFireTime());
                job = repository.save(job);
                getJobDetail(job).getJobDataMap().put(getJobIdentity(job), job);

                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
                // 按新的表示式構建一個新的trigger
                trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
                                                     .startAt(job.getStartTime()==null ? (new Date()) : job.getStartTime()) // 設定job不早於這個時間進行執行,和呼叫trigger的setStartTime方法效果一致
                                                     .withSchedule(scheduleBuilder).build();
                scheduler.rescheduleJob(triggerKey, trigger);
            }
        }
        return true;
    }

    private String getJobIdentity(QuartzJobBean job) {
        return "scheduleJob"+(job.getJobGroup() +"_"+job.getJobName());
    }

    private JobDetail getJobDetail(QuartzJobBean job) {
        Class<? extends Job> clazz = QuartzJobBean.CONCURRENT_IS.equals(job.isConcurrent()) 
                        ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;       
        JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();
        return jobDetail;
    }

    /**    
     * 暫停任務    
     * @param job    
     * @return    
     */ 
    @Transactional
    public boolean pauseJob(QuartzJobBean job){    
        Scheduler scheduler = getScheduler();
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        boolean result;
        try {
            scheduler.pauseJob(jobKey);

            // 更新任務狀態到資料庫
            job.setJobStatus(QuartzJobBean.STATUS_NOT_RUNNING);
            repository.modifyByStatus(job.getJobStatus(), job.getJobId());

            result = true;
        } catch (SchedulerException e) {
            result = false;
            e.printStackTrace();
        }
        return result;
    }

    /**    
     * 恢復任務    
     * @param job    
     * @return    
     */    
    @Transactional
    public boolean resumeJob(QuartzJobBean job){
        Scheduler scheduler = getScheduler();
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        boolean result;
        try {
            logger.info("resume job : " + (job.getJobGroup() + "_" + job.getJobName()));
            TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
            // 表示式排程構建器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
                                            .startAt(job.getStartTime()==null ? (new Date()) : job.getStartTime()) // 設定job不早於這個時間進行執行,和呼叫trigger的setStartTime方法效果一致
                                            .withSchedule(scheduleBuilder).build();
            scheduler.rescheduleJob(triggerKey, trigger);
            scheduler.resumeJob(jobKey);

            // 更新任務狀態到資料庫
            job.setJobStatus(QuartzJobBean.STATUS_RUNNING);
            repository.modifyByStatus(job.getJobStatus(), job.getJobId());

            result = true;
        } catch (SchedulerException e) {
            result = false;
            e.printStackTrace();
        }
        return result;
    }

    /**    
     * 刪除任務    
     */    
    @Transactional
    public boolean deleteJob(QuartzJobBean job){ 
        Scheduler scheduler = getScheduler();    
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());    
        boolean result;
        try{    
            scheduler.deleteJob(jobKey);

            // 更新任務狀態到資料庫
            job.setJobStatus(QuartzJobBean.STATUS_DELETED);
            repository.modifyByStatus(job.getJobStatus(), job.getJobId());

            result = true;    
        } catch (SchedulerException e) {    
            result = false;
            e.printStackTrace();
        }    
        return result;    
    } 

    /**    
     * 立即執行一個任務    
     * @param scheduleJob    
     * @throws SchedulerException    
     */    
    public void startJob(QuartzJobBean scheduleJob) throws SchedulerException{
        Scheduler scheduler = getScheduler();  
        JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
        scheduler.triggerJob(jobKey);
    }

    /**    
     * 更新任務時間表達式    
     * @param job    
     * @throws SchedulerException    
     */    
    @Transactional
    public void updateCronExpression(QuartzJobBean job) throws SchedulerException {
        Scheduler scheduler = getScheduler();
        TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
        //獲取trigger,即在spring配置檔案中定義的 bean id="myTrigger"
        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        //表示式排程構建器    
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
        //按新的cronExpression表示式重新構建trigger
        trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
        //按新的trigger重新設定job執行
        scheduler.rescheduleJob(triggerKey, trigger);

        // 更新 job 資訊到資料庫
        job.setStartTime(trigger.getStartTime());
        job.setNextTime(trigger.getNextFireTime());
        job.setPreviousTime(trigger.getPreviousFireTime());
        job = repository.save(job);
        getJobDetail(job).getJobDataMap().put(getJobIdentity(job), job);
    }

    /**
     * 設定job的開始schedule時間
     * @param job
     * @throws SchedulerException
     */
    @Transactional
    public void updateStartTime(QuartzJobBean job) throws SchedulerException {
        Scheduler scheduler = getScheduler();
        TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
        //獲取trigger,即在spring配置檔案中定義的 bean id="myTrigger"
        CronTriggerImpl trigger = (CronTriggerImpl) scheduler.getTrigger(triggerKey);
        trigger.setStartTime(job.getStartTime());
        //按新的trigger重新設定job執行
        scheduler.rescheduleJob(triggerKey, trigger);

        // 更新 job 資訊到資料庫
        job.setStartTime(trigger.getStartTime());
        job.setNextTime(trigger.getNextFireTime());
        job.setPreviousTime(trigger.getPreviousFireTime());
        job = repository.save(job);
        getJobDetail(job).getJobDataMap().put(getJobIdentity(job), job);
    }

}

 服務啟動時,讀取資料庫 job 資訊,並進行 schedule

@Component
public class MyRunner implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired     
    private TaskService taskService;

    @Autowired
    private QuartzJobService jobService;

    @Override
    public void run(String... args) throws Exception {
        // 可執行的任務列表        
        List<QuartzJobBean> taskList = jobService.findByJobStatus(QuartzJobBean.STATUS_RUNNING);     
        logger.info("初始化載入定時任務......");     
        for (QuartzJobBean job : taskList) {     
            try {
                taskService.addJob(job);     
            } catch (Exception e) {
                logger.error("add job error: " + job.getJobName() + " " + job.getJobGroup(), e);
            }
        }  
    }

}