Spring+quartz 實現動態管理任務
阿新 • • 發佈:2019-01-06
在實際專案應用中經常會用到定時任務,可以通過quartz和spring的簡單配置即可完成,但如果要改變任務的執行時間、頻率,廢棄任務等就需要改變配置甚至程式碼需要重啟伺服器,這裡介紹一下如何通過quartz與spring的組合實現動態的改變定時任務的狀態的一個實現。 本文章適合對quartz和spring有一定了解的讀者。 spring版本為3.2 quartz版本為2.2.1 如果使用了quartz2.2.1 則spring版本需3.1以上 1、spring配置檔案中引入註冊bean <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" /> <bean id="loadTask" class="*.quartz.LoadTask" init-method="initTask" /> 2、LoadTask.java(初始化載入資料庫任務) public class LoadTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private TaskService taskService; public void initTask() throws SchedulerException { // 可執行的任務列表 List<QuartzJobBean> taskList = taskService.getTaskList(); logger.info("初始化載入定時任務......"); for (QuartzJobBean job : taskList) { taskService.addJob(job); } } } 3、TaskService.java Service(value = "taskService") public class TaskService { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Reference(registry = "real-registry") private DataExchangeModuleService moduleService; /** * 獲取單個任務 * @param jobName * @param jobGroup * @return * @throws SchedulerException */ public QuartzJobBean getJob(String jobName,String jobGroup) throws SchedulerException{ QuartzJobBean job = null; Scheduler scheduler = schedulerFactoryBean.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (null != trigger) { job = new QuartzJobBean(); job.setJobName(jobName); job.setJobGroup(jobGroup); job.setDescription("觸發器:" + trigger.getKey()); job.setNextTime(trigger.getNextFireTime()); //下次觸發時間 job.setPreviousTime(trigger.getPreviousFireTime());//上次觸發時間 Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } } return job; } /** * 獲取所有任務 * @return * @throws SchedulerException */ public List<QuartzJobBean> getAllJobs() throws SchedulerException{ Scheduler scheduler = schedulerFactoryBean.getScheduler(); GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup(); Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); List<QuartzJobBean> jobList = new ArrayList<QuartzJobBean>(); for (JobKey jobKey : jobKeys) { List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { QuartzJobBean job = new QuartzJobBean(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); job.setNextTime(trigger.getNextFireTime()); //下次觸發時間 // trigger.getFinalFireTime();//最後一次執行時間 job.setPreviousTime(trigger.getPreviousFireTime());//上次觸發時間 // trigger.getStartTime();//開始時間 // trigger.getEndTime();//結束時間 //觸發器當前狀態 Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); // if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } } return jobList; } /** * 所有正在執行的job * * @return * @throws SchedulerException */ public List<QuartzJobBean> getRunningJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs(); List<QuartzJobBean> jobList = new ArrayList<QuartzJobBean>(executingJobs.size()); for (JobExecutionContext executingJob : executingJobs) { QuartzJobBean job = new QuartzJobBean(); JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } return jobList; } /** * 查詢任務列表 * @return */ public List<QuartzJobBean> getTaskList(){ List<QuartzJobBean> jobs = new ArrayList<QuartzJobBean>(); List<DataExchangeModuleBO> taskList = moduleService.findAll(); QuartzJobBean job = null; for(DataExchangeModuleBO bo:taskList){ job = getTask(bo); if(job!=null){ jobs.add(job); } } return jobs; } /** * 查詢任務列表 * @return */ public QuartzJobBean getTask(DataExchangeModuleBO bo){ QuartzJobBean job = null; if(bo!=null){ job = new QuartzJobBean(); job.setJobId(String.valueOf(bo.getId())); job.setJobName(bo.getModuleName()); job.setJobGroup(bo.getSystemName()); job.setJobStatus(bo.getStatus());//初始狀態 job.setCronExpression(bo.getCron()); job.setSpringId(bo.getSpringId()); job.setConcurrent(bo.getConcurrent()); job.setJobClass(bo.getClazzName()); job.setMethodName(bo.getMethodName()); job.setDescription(bo.getSystemName()+"->"+bo.getModuleName()+"->"+bo.getInterfaceInfo()); } return job; } /** * 新增任務 * * @param scheduleJob * @throws SchedulerException */ public boolean addJob(QuartzJobBean job) throws SchedulerException { if (job == null || !QuartzJobBean.STATUS_RUNNING.equals(job.getJobStatus())) { return false; } if(!TaskUtils.isValidExpression(job.getCronExpression())){ logger.error("時間表達式錯誤("+job.getJobName()+","+job.getJobGroup()+"),"+job.getCronExpression()); return false; }else{ Scheduler scheduler = schedulerFactoryBean.getScheduler(); // 任務名稱和任務組設定規則: // 名稱:task_1 .. // 組 :group_1 .. TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 不存在,建立一個 if (null == trigger) { //是否允許併發執行 Class<? extends Job> clazz = QuartzJobBean.CONCURRENT_IS.equals(job.isConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class; JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build(); jobDetail.getJobDataMap().put("scheduleJob", job); // 表示式排程構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的表示式構建一個新的trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); } else { // trigger已存在,則更新相應的定時設定 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的cronExpression表示式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); // 按新的trigger重新設定job執行 scheduler.rescheduleJob(triggerKey, trigger); } } return true; } /** * 暫停任務 * @param scheduleJob * @return */ public boolean pauseJob(QuartzJobBean scheduleJob){ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); try { scheduler.pauseJob(jobKey); return true; } catch (SchedulerException e) { } return false; } /** * 恢復任務 * @param scheduleJob * @return */ public boolean resumeJob(QuartzJobBean scheduleJob){ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); try { scheduler.resumeJob(jobKey); return true; } catch (SchedulerException e) { } return false; } /** * 刪除任務 */ public boolean deleteJob(QuartzJobBean scheduleJob){ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); try{ scheduler.deleteJob(jobKey); return true; } catch (SchedulerException e) { } return false; } /** * 立即執行一個任務 * @param scheduleJob * @throws SchedulerException */ public void testJob(QuartzJobBean scheduleJob) throws SchedulerException{ Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.triggerJob(jobKey); } /** * 更新任務時間表達式 * @param scheduleJob * @throws SchedulerException */ public void updateCronExpression(QuartzJobBean scheduleJob) throws SchedulerException{ Scheduler scheduler = schedulerFactoryBean.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); //獲取trigger,即在spring配置檔案中定義的 bean id="myTrigger" CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); //表示式排程構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()); //按新的cronExpression表示式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); //按新的trigger重新設定job執行 scheduler.rescheduleJob(triggerKey, trigger); } 4、TaskUtils.java public class TaskUtils { private final static Logger logger = LoggerFactory.getLogger(TaskUtils.class); /** * 通過反射呼叫scheduleJob中定義的方法 * * @param scheduleJob */ public static void invokMethod(QuartzJobBean scheduleJob) { Object object = null; Class<?> clazz = null; //springId不為空先按springId查詢bean if (StringUtils.isNotBlank(scheduleJob.getSpringId())) { object = SpringUtils.getBean(scheduleJob.getSpringId()); } else if (StringUtils.isNotBlank(scheduleJob.getJobClass())) {//按jobClass查詢 try { clazz = Class.forName(scheduleJob.getJobClass()); object = clazz.newInstance(); } catch (Exception e) { e.printStackTrace(); } } if (object == null) { logger.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啟動成功,請檢查執行類是否配置正確!!!"); return; } clazz = object.getClass(); Method method = null; try { method = clazz.getDeclaredMethod(scheduleJob.getMethodName()); } catch (NoSuchMethodException e) { logger.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啟動成功,請檢查執行類的方法名是否設定錯誤!!!"); } catch (SecurityException e) { e.printStackTrace(); } if (method != null) { try { method.invoke(object); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 判斷cron時間表達式正確性 * @param cronExpression * @return */ public static boolean isValidExpression(final String cronExpression){ // CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); CronTriggerImpl trigger = new CronTriggerImpl(); try { trigger.setCronExpression(cronExpression); Date date = trigger.computeFirstFireTime(null); return date != null && date.after(new Date()); } catch (ParseException e) { } return false; } public static void main(String[] args){ System.out.println(isValidExpression("0 0/1 * * * ?")); } /* * 任務執行狀態 */ public enum TASK_STATE{ NONE("NONE","未知"), NORMAL("NORMAL", "正常執行"), PAUSED("PAUSED", "暫停狀態"), COMPLETE("COMPLETE",""), ERROR("ERROR","錯誤狀態"), BLOCKED("BLOCKED","鎖定狀態"); private TASK_STATE(String index,String name) { this.name = name; this.index = index; } private String index; private String name; public String getName() { return name; } public String getIndex() { return index; } } } 5、QuartzJobFactory.java /** * Job實現類 無狀態 * 若此方法上一次還未執行完,而下一次執行時間輪到時則該方法也可併發執行 * @author root */ public class QuartzJobFactory implements Job { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { QuartzJobBean scheduleJob = (QuartzJobBean)context.getMergedJobDataMap().get("scheduleJob"); logger.info("執行任務名稱 = [" + scheduleJob.getJobName() + "]"); TaskUtils.invokMethod(scheduleJob); } } 6、QuartzJobFactoryDisallowConcurrentExecution.java /** * Job有狀態實現類,不允許併發執行 * 若一個方法一次執行不完下次輪轉時則等待該方法執行完後才執行下一次操作 * 主要是通過註解:@DisallowConcurrentExecution * @author root * */ @DisallowConcurrentExecution public class QuartzJobFactoryDisallowConcurrentExecution implements Job { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { QuartzJobBean scheduleJob = (QuartzJobBean)context.getMergedJobDataMap().get("scheduleJob"); logger.info("執行任務名稱 = [" + scheduleJob.getJobName() + "]"); TaskUtils.invokMethod(scheduleJob); } } 7、QuartzJobBean.java public static final String STATUS_RUNNING = "1"; public static final String STATUS_NOT_RUNNING = "0"; public static final String CONCURRENT_IS = "1"; public static final String CONCURRENT_NOT = "0"; /** 任務id */ private String jobId; /** 任務名稱 */ private String jobName; /** 任務分組,任務名稱+組名稱應該是唯一的 */ private String jobGroup; /** 任務初始狀態 0禁用 1啟用 2刪除*/ private String jobStatus; /** 任務是否有狀態(併發) */ private String isConcurrent = "1"; /** 任務執行時間表達式 */ private String cronExpression; /** 任務描述 */ private String description; /** 任務呼叫類在spring中註冊的bean id,如果spingId不為空,則按springId查詢 */ private String springId; /** 任務呼叫類名,包名+類名,通過類反射呼叫 ,如果spingId為空,則按jobClass查詢 */ private String jobClass; /** 任務呼叫的方法名 */ private String methodName; /** 啟動時間 */ private Date startTime; /** 前一次執行時間 */ private Date previousTime; /** 下次執行時間 */ private Date nextTime; 8、DECMService.java 此類就是所有具體要呼叫的任務方法都寫在此服務類中。
轉自:http://windows9834.blog.163.com