Quartz詳解
目錄:
一、Quartz 基本介紹
1.1 Quartz 概述
1.2 Quartz特點
1.3 Quartz 叢集配置
二、Quartz 原理及流程
2.1 quartz基本原理
2.2 quartz啟動流程
三、Spring + Quartz 實現企業級排程的實現示例
3.1 環境資訊
3.2 相關程式碼及配置
四、問題及解決方案
五、相關知識
六、參考資料
總結
一、Quartz 基本介紹
1.1 Quartz 概述
Quartz 是 OpenSymphony 開源組織在任務排程領域的一個開源專案,完全基於 Java 實現。該專案於 2009 年被 Terracotta 收購,目前是 Terracotta 旗下的一個專案。讀者可以到 http://www.quartz-scheduler.org/站點下載 Quartz 的釋出版本及其原始碼。
1.2 Quartz特點
作為一個優秀的開源排程框架,Quartz 具有以下特點:
- 強大的排程功能,例如支援豐富多樣的排程方法,可以滿足各種常規及特殊需求;
- 靈活的應用方式,例如支援任務和排程的多種組合方式,支援排程資料的多種儲存方式;
- 分散式和叢集能力,Terracotta 收購後在原來功能基礎上作了進一步提升。
另外,作為 Spring 預設的排程框架,Quartz 很容易與 Spring 整合實現靈活可配置的排程功能。
quartz排程核心元素:
- Scheduler:任務排程器,是實際執行任務排程的控制器。在spring中通過SchedulerFactoryBean封裝起來。
- Trigger:觸發器,用於定義任務排程的時間規則,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比較多,本文主要介紹這種方式。CronTrigger在spring中封裝在CronTriggerFactoryBean中。
- Calendar:它是一些日曆特定時間點的集合。一個trigger可以包含多個Calendar,以便排除或包含某些時間點。
- JobDetail:用來描述Job實現類及其它相關的靜態資訊,如Job名字、關聯監聽器等資訊。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean兩種實現,如果任務排程只需要執行某個類的某個方法,就可以通過MethodInvokingJobDetailFactoryBean來呼叫。
- Job:是一個介面,只有一個方法void execute(JobExecutionContext context),開發者實現該介面定義執行任務,JobExecutionContext類提供了排程上下文的各種資訊。Job執行時的資訊儲存在JobDataMap例項中。實現Job介面的任務,預設是無狀態的,若要將Job設定成有狀態的,在quartz中是給實現的Job新增@DisallowConcurrentExecution註解(以前是實現StatefulJob介面,現在已被Deprecated),在與spring結合中可以在spring配置檔案的job detail中配置concurrent引數。
1.3 Quartz 叢集配置
quartz叢集是通過資料庫表來感知其他的應用的,各個節點之間並沒有直接的通訊。只有使用持久的JobStore才能完成Quartz叢集。
資料庫表:以前有12張表,現在只有11張表,現在沒有儲存listener相關的表,多了QRTZ_SIMPROP_TRIGGERS表:
Table name | Description |
---|---|
QRTZ_CALENDARS | 儲存Quartz的Calendar資訊 |
QRTZ_CRON_TRIGGERS | 儲存CronTrigger,包括Cron表示式和時區資訊 |
QRTZ_FIRED_TRIGGERS | 儲存與已觸發的Trigger相關的狀態資訊,以及相聯Job的執行資訊 |
QRTZ_PAUSED_TRIGGER_GRPS | 儲存已暫停的Trigger組的資訊 |
QRTZ_SCHEDULER_STATE | 儲存少量的有關Scheduler的狀態資訊,和別的Scheduler例項 |
QRTZ_LOCKS | 儲存程式的悲觀鎖的資訊 |
QRTZ_JOB_DETAILS | 儲存每一個已配置的Job的詳細資訊 |
QRTZ_SIMPLE_TRIGGERS | 儲存簡單的Trigger,包括重複次數、間隔、以及已觸的次數 |
QRTZ_BLOG_TRIGGERS | Trigger作為Blob型別儲存 |
QRTZ_TRIGGERS | 儲存已配置的Trigger的資訊 |
QRTZ_SIMPROP_TRIGGERS |
QRTZ_LOCKS就是Quartz叢集實現同步機制的行鎖表,包括以下幾個鎖:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。
二、Quartz 原理及流程
2.1 quartz基本原理
核心元素
Quartz 任務排程的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任務排程的元資料, scheduler 是實際執行排程的控制器。
在 Quartz 中,trigger 是用於定義排程時間的元素,即按照什麼時間規則去執行任務。Quartz 中主要提供了四種類型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。這四種 trigger 可以滿足企業應用中的絕大部分需求。我們將在企業應用一節中進一步討論四種 trigger 的功能。
在 Quartz 中,job 用於表示被排程的任務。主要有兩種型別的 job:無狀態的(stateless)和有狀態的(stateful)。對於同一個 trigger 來說,有狀態的 job 不能被並行執行,只有上一次觸發的任務被執行完之後,才能觸發下一次執行。Job 主要有兩種屬性:volatility 和 durability,其中 volatility 表示任務是否被持久化到資料庫儲存,而 durability 表示在沒有 trigger 關聯的時候任務是否被保留。兩者都是在值為 true 的時候任務被持久化或保留。一個 job 可以被多個 trigger 關聯,但是一個 trigger 只能關聯一個 job。
在 Quartz 中, scheduler 由 scheduler 工廠建立:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二種工廠 StdSchedulerFactory 使用較多,因為 DirectSchedulerFactory 使用起來不夠方便,需要作許多詳細的手工編碼設定。 Scheduler 主要有三種:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 為例講解。這也是筆者在專案中所使用的 scheduler 類。
Quartz 核心元素之間的關係如下圖所示:
圖 1. Quartz 核心元素關係圖
執行緒檢視
在 Quartz 中,有兩類執行緒,Scheduler 排程執行緒和任務執行執行緒,其中任務執行執行緒通常使用一個執行緒池維護一組執行緒。
圖 2. Quartz 執行緒檢視
Scheduler 排程執行緒主要有兩個: 執行常規排程的執行緒,和執行 misfired trigger 的執行緒。常規排程執行緒輪詢儲存的所有 trigger,如果有需要觸發的 trigger,即到達了下一次觸發的時間,則從任務執行執行緒池獲取一個空閒執行緒,執行與該 trigger 關聯的任務。Misfire 執行緒是掃描所有的 trigger,檢視是否有 misfired trigger,如果有的話根據 misfire 的策略分別處理。下圖描述了這兩個執行緒的基本流程:
圖 3. Quartz 排程執行緒流程圖
關於 misfired trigger,我們在企業應用一節中將進一步描述。
資料儲存
Quartz 中的 trigger 和 job 需要儲存下來才能被使用。Quartz 中有兩種儲存方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是將 trigger 和 job 儲存在記憶體中,而 JobStoreSupport 是基於 jdbc 將 trigger 和 job 儲存到資料庫中。RAMJobStore 的存取速度非常快,但是由於其在系統被停止後所有的資料都會丟失,所以在通常應用中,都是使用 JobStoreSupport。
在 Quartz 中,JobStoreSupport 使用一個驅動代理來操作 trigger 和 job 的資料儲存:StdJDBCDelegate。StdJDBCDelegate 實現了大部分基於標準 JDBC 的功能介面,但是對於各種資料庫來說,需要根據其具體實現的特點做某些特殊處理,因此各種資料庫需要擴充套件 StdJDBCDelegate 以實現這些特殊處理。Quartz 已經自帶了一些資料庫的擴充套件實現,可以直接使用,如下圖所示:
圖 4. Quartz 資料庫驅動代理
作為嵌入式資料庫的代表,Derby 近來非常流行。如果使用 Derby 資料庫,可以使用上圖中的 CloudscapeDelegate 作為 trigger 和 job 資料儲存的代理類。
2.2 quartz啟動流程
若quartz是配置在spring中,當伺服器啟動時,就會裝載相關的bean。SchedulerFactoryBean實現了InitializingBean介面,因此在初始化bean的時候,會執行afterPropertiesSet方法,該方法將會呼叫SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)建立Scheduler。SchedulerFactory在建立quartzScheduler的過程中,將會讀取配置引數,初始化各個元件,關鍵元件如下:
-
ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool建立了一定數量的WorkerThread例項來使得Job能夠線上程中進行處理。WorkerThread是定義在SimpleThreadPool類中的內部類,它實質上就是一個執行緒。在SimpleThreadPool中有三個list:workers-存放池中所有的執行緒引用,availWorkers-存放所有空閒的執行緒,busyWorkers-存放所有工作中的執行緒;
執行緒池的配置引數如下所示:1 2 3
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=3 org.quartz.threadPool.threadPriority=5
-
JobStore:分為儲存在記憶體的RAMJobStore和儲存在資料庫的JobStoreSupport(包括JobStoreTX和JobStoreCMT兩種實現,JobStoreCMT是依賴於容器來進行事務的管理,而JobStoreTX是自己管理事務),若要使用叢集要使用JobStoreSupport的方式;
- QuartzSchedulerThread:用來進行任務排程的執行緒,在初始化的時候paused=true,halted=false,雖然執行緒開始運行了,但是paused=true,執行緒會一直等待,直到start方法將paused置為false;
另外,SchedulerFactoryBean還實現了SmartLifeCycle介面,因此初始化完成後,會執行start()方法,該方法將主要會執行以下的幾個動作:
- 建立ClusterManager執行緒並啟動執行緒:該執行緒用來進行叢集故障檢測和處理,將在下文詳細討論;
- 建立MisfireHandler執行緒並啟動執行緒:該執行緒用來進行misfire任務的處理,將在下文詳細討論;
- 置QuartzSchedulerThread的paused=false,排程執行緒才真正開始排程;
三、Spring + Quartz 實現企業級排程的實現示例
3.1 環境資訊
此示例中的環境: Spring 4.1.6.RELEASE + quartz 2.2.1 + Mysql 5.6
3.2 相關程式碼及配置
3.2.1 Maven 引入
3.2.2 資料庫指令碼準備
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for task_schedule_job
-- ----------------------------
DROP TABLE IF EXISTS `task_schedule_job`;
CREATE TABLE `task_schedule_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT,
`create_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`job_name` varchar(255) DEFAULT NULL,
`job_group` varchar(255) DEFAULT NULL,
`job_status` varchar(255) DEFAULT NULL,
`cron_expression` varchar(255) NOT NULL,
`description` varchar(255) DEFAULT NULL,
`bean_class` varchar(255) DEFAULT NULL,
`is_concurrent` varchar(255) DEFAULT NULL COMMENT '1',
`spring_id` varchar(255) DEFAULT NULL,
`method_name` varchar(255) NOT NULL
PRIMARY KEY (`job_id`),
UNIQUE KEY `name_group` (`job_name`,`job_group`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
在Quartz包下docs/dbTables,選擇對應的資料庫指令碼,建立相應的資料庫表即可,我用的是mysql5.6,這裡有一個需要注意的地方,mysql5.5之前用的表儲存引擎是MyISAM,使用的是表級鎖,鎖發生衝突的概率比較高,併發度低;5.6之後預設的儲存引擎為InnoDB,InnoDB採用的鎖機制是行級鎖,併發度也較高。而quartz叢集使用資料庫鎖的
機制來來實現同一個任務在同一個時刻只被例項執行,所以為了防止衝突,我們建表的時候要選取InnoDB作為表的存
儲引擎。如下:
3.2.3 關鍵程式碼及配置
<1>spring-quartz.xml 配置 在application.xml 檔案中引入
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <!-- 註冊本地排程任務 <bean id="localQuartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"></bean>--> <!-- 註冊叢集排程任務 --> <bean id="schedulerFactoryBean" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy"> <!--可選,QuartzScheduler 啟動時更新己存在的Job,這樣就不用每次修改targetObject後刪除qrtz_job_details表對應記錄了 --> <property name="overwriteExistingJobs" value="true" /> <!--必須的,QuartzScheduler 延時啟動,應用啟動完後 QuartzScheduler 再啟動 --> <property name="startupDelay" value="3" /> <!-- 設定自動啟動 --> <property name="autoStartup" value="true" /> <property name="applicationContextSchedulerContextKey" value="applicationContext" /> <property name="configLocation" value="classpath:quartz.properties" /> </bean> </beans>
<2>quartz.properties 檔案配置
#============================================================== #Configure Main Scheduler Properties #============================================================== org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler org.quartz.scheduler.instanceId = AUTO #============================================================== #Configure JobStore #============================================================== org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.jobStore.dataSource = myDS org.quartz.jobStore.maxMisfiresToHandleAtATime = 1 org.quartz.jobStore.misfireThreshold = 120000 org.quartz.jobStore.txIsolationLevelSerializable = false #============================================================== #Configure DataSource #============================================================== org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.myDS.URL = 你的資料鏈接 org.quartz.dataSource.myDS.user = 使用者名稱 org.quartz.dataSource.myDS.password = 密碼 org.quartz.dataSource.myDS.maxConnections = 30 org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE #============================================================== #Configure ThreadPool #============================================================== org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount= 10 org.quartz.threadPool.threadPriority= 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true #============================================================== #Skip Check Update #update:true #not update:false #============================================================== org.quartz.scheduler.skipUpdateCheck = true #============================================================================ # Configure Plugins #============================================================================ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin org.quartz.plugin.shutdownhook.cleanShutdown = true
<3>關鍵程式碼
package com.netease.ad.omp.service.sys; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Set; import javax.annotation.PostConstruct; import javax.annotation.Resource; import com.netease.ad.omp.common.utils.SpringUtils; import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper; import com.netease.ad.omp.entity.sys.ScheduleJob; import com.netease.ad.omp.quartz.job.JobUtils; import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean; import com.netease.ad.omp.quartz.job.QuartzJobFactory; import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution; import org.apache.log4j.Logger; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; /** * 計劃任務管理 */ @Service public class JobTaskService { public final Logger log = Logger.getLogger(this.getClass()); @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired private ScheduleJobMapper scheduleJobMapper; /** * 從資料庫中取 區別於getAllJob * * @return */ public List<ScheduleJob> getAllTask() { return scheduleJobMapper.select(null); } /** * 新增到資料庫中 區別於addJob */ public void addTask(ScheduleJob job) { job.setCreateTime(new Date()); scheduleJobMapper.insertSelective(job); } /** * 從資料庫中查詢job */ public ScheduleJob getTaskById(Long jobId) { return scheduleJobMapper.selectByPrimaryKey(jobId); } /** * 更改任務狀態 * * @throws SchedulerException */ public void changeStatus(Long jobId, String cmd) throws SchedulerException { ScheduleJob job = getTaskById(jobId); if (job == null) { return; } if ("stop".equals(cmd)) { deleteJob(job); job.setJobStatus(JobUtils.STATUS_NOT_RUNNING); } else if ("start".equals(cmd)) { job.setJobStatus(JobUtils.STATUS_RUNNING); addJob(job); } scheduleJobMapper.updateByPrimaryKeySelective(job); } /** * 更改任務 cron表示式 * * @throws SchedulerException */ public void updateCron(Long jobId, String cron) throws SchedulerException { ScheduleJob job = getTaskById(jobId); if (job == null) { return; } job.setCronExpression(cron); if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) { updateJobCron(job); } scheduleJobMapper.updateByPrimaryKeySelective(job); } /** * 新增任務 * * @throws SchedulerException */ public void addJob(ScheduleJob job) throws SchedulerException { if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) { return; } Scheduler scheduler = schedulerFactoryBean.getScheduler(); log.debug(scheduler + ".......................................................................................add"); TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 不存在,建立一個 if (null == trigger) { Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class; JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build(); jobDetail.getJobDataMap().put("scheduleJob", job); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); } else { // Trigger已存在,那麼更新相應的定時設定 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的cronExpression表示式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); // 按新的trigger重新設定job執行 scheduler.rescheduleJob(triggerKey, trigger); } } @PostConstruct public void init() throws Exception { // 這裡獲取任務資訊資料 List<ScheduleJob> jobList = scheduleJobMapper.select(null); for (ScheduleJob job : jobList) { addJob(job); } } /** * 獲取所有計劃中的任務列表 * * @return * @throws SchedulerException */ public List<ScheduleJob> getAllJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup(); Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(); for (JobKey jobKey : jobKeys) { List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { ScheduleJob job = new ScheduleJob(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } } return jobList; } /** * 所有正在執行的job * * @return * @throws SchedulerException */ public List<ScheduleJob> getRunningJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs(); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size()); for (JobExecutionContext executingJob : executingJobs) { ScheduleJob job = new ScheduleJob(); JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } return jobList; } /** * 暫停一個job * * @param scheduleJob * @throws SchedulerException */ public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.pauseJob(jobKey); } /** * 恢復一個job * * @param scheduleJob * @throws SchedulerException */ public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.resumeJob(jobKey); } /** * 刪除一個job * * @param scheduleJob * @throws SchedulerException */ public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.deleteJob(jobKey); } /** * 立即執行job * * @param scheduleJob * @throws SchedulerException */ public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.triggerJob(jobKey); } /** * 更新job時間表達式 * * @param scheduleJob * @throws SchedulerException */ public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()); trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); scheduler.rescheduleJob(triggerKey, trigger); } public static void main(String[] args) { CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("xxxxx"); } }
package com.netease.ad.omp.quartz.job; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.netease.ad.omp.common.utils.SpringUtils; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.quartz.JobExecutionContext; import org.springframework.context.ApplicationContext; /** * Created with IntelliJ IDEA * ProjectName: omp * Author: bjsonghongxu * CreateTime : 15:58 * Email: [email protected] * Class Description: * 定時任務工具類 */ public class JobUtils { public final static Logger log = Logger.getLogger(JobUtils.class); public static final String STATUS_RUNNING = "1"; //啟動狀態 public static final String STATUS_NOT_RUNNING = "0"; //未啟動狀態 public static final String CONCURRENT_IS = "1"; public static final String CONCURRENT_NOT = "0"; private ApplicationContext ctx; /** * 通過反射呼叫scheduleJob中定義的方法 * * @param scheduleJob */ public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) { Object object = null; Class clazz = null; if (StringUtils.isNotBlank(scheduleJob.getSpringId())) { object = SpringUtils.getBean(scheduleJob.getSpringId()); } else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) { try { clazz = Class.forName(scheduleJob.getBeanClass()); object = clazz.newInstance(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (object == null) { log.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啟動成功,請檢查是否配置正確!!!"); return; } clazz = object.getClass(); Method method = null; try { method = clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class}); } catch (NoSuchMethodException e) { log.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啟動成功,方法名設定錯誤!!!"); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (method != null) { try { method.invoke(object, new Object[] {context}); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } log.info("任務名稱 = [" + scheduleJob.getJobName() + "]----------啟動成功"); } }
package com.netease.ad.omp.quartz.job; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.log4j.Logger; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * * @Description: 計劃任務執行處 無狀態 * Spring排程任務 (重寫 quartz 的 QuartzJobBean 類原因是在使用 quartz+spring 把 quartz 的 task 例項化進入資料庫時,會產生: serializable 的錯誤) */ public class QuartzJobFactory implements Job { public final Logger log = Logger.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob"); JobUtils.invokMethod(scheduleJob,context); } }
package com.netease.ad.omp.quartz.job; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * * @Description: 若一個方法一次執行不完下次輪轉時則等待該方法執行完後才執行下一次操作 * Spring排程任務 (重寫 quartz 的 QuartzJobBean 類原因是在使用 quartz+spring 把 quartz 的 task 例項化進入資料庫時,會產生: serializable 的錯誤) */ @DisallowConcurrentExecution public class QuartzJobFactoryDisallowConcurrentExecution implements Job { public final Logger log = Logger.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob"); JobUtils.invokMethod(scheduleJob,context); } }
package com.netease.ad.omp.entity.sys; import javax.persistence.Id; import javax.persistence.Table; import java.io.Serializable; import java.util.Date; /** * Created with IntelliJ IDEA * ProjectName: omp * Author: bjsonghongxu * CreateTime : 15:48 * Email: [email protected] * Class Description: * 計劃任務資訊 */ @Table(name = "task_schedule_job") public class ScheduleJob implements Serializable { @Id private Long jobId; private Date createTime; private Date updateTime; /** * 任務名稱 */ private String jobName; /** * 任務分組 */ private String jobGroup; /** * 任務狀態 是否啟動任務 */ private String jobStatus; /** * cron表示式 */ private String cronExpression; /** * 描述 */ private String description; /** * 任務執行時呼叫哪個類的方法 包名+類名 */ private String beanClass; /** * 任務是否有狀態 */ private String isConcurrent; /** * spring bean */ private String springId; /** * 任務呼叫的方法名 */ private String methodName; public Long getJobId() { return jobId; } public void setJobId(Long jobId) { this.jobId = jobId; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public String getJobGroup() { return jobGroup; } public void setJobGroup(String jobGroup) { this.jobGroup = jobGroup; } public String getJobStatus() { return jobStatus; } public void setJobStatus(String jobStatus) { this.jobStatus = jobStatus; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getBeanClass() { return beanClass; } public void setBeanClass(String beanClass) { this.beanClass = beanClass; } public String getIsConcurrent() { return isConcurrent; } public void setIsConcurrent(String isConcurrent) { this.isConcurrent = isConcurrent; } public String getSpringId() { return springId; } public void setSpringId(String springId) { this.springId = springId; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } }
package com.netease.ad.omp.common.utils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; public final class SpringUtils implements BeanFactoryPostProcessor { private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境 @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } /** * 獲取物件 * * @param name * @return Object 一個以所給名字註冊的bean的例項 * @throws BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 獲取型別為requiredType的物件 * * @param clz * @return * @throws BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { @SuppressWarnings("unchecked") T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判斷以給定名字註冊的bean定義是一個singleton還是一個prototype。 * 如果與給定名字相應的bean定義沒有被找到,將會丟擲一個異常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 註冊物件的型別 * @throws NoSuchBeanDefinitionException * */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果給定的bean名字在bean定義中有別名,則返回這些別名 * * @param name * @return * @throws NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } }
至於前端自己畫個簡單的介面即可使用了。
四、問題及解決方案
4.1quartz mysql 死鎖問題
quartz文件提到,如果在叢集環境下,最好將配置項org.quartz.jobStore.txIsolationLevelSerializable設定為true
問題:
這個選項在mysql下會非常容易出現死鎖問題。
2014-12-29 09:55:28.006 [QuartzScheduler_clusterQuartzSchedular-BJ-YQ-64.2491419487774923_ClusterManager] ERROR o.q.impl.jdbcjobstore.JobStoreTX [U][] - ClusterManager: Error managing cluster: Failure updating scheduler state when checking-in: Deadlock found when trying to get lock; try restarting transaction
這個選項存在意義:
quartz需要提升隔離級別來保障自己的運作,不過,由於各資料庫實現的隔離級別定義都不一樣,所以quartz提供一個設定序列化這樣的隔離級別存在,因為例如oracle中是沒有未提交讀和可重複讀這樣的隔離級別存在。但是由於mysql預設的是可重複讀,比提交讀高了一個級別,所以已經可以滿足quartz叢集的正常執行。
五、相關知識
5.1、QuartzSchedulerThread執行緒
執行緒的主要邏輯程式碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
while (!halted.get()) { int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow()); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } List<TriggerFiredResult> bndle = qsRsrcs.getJobStore().triggersFired(triggers); for(int i = 0;i < res.size();i++){ JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); qsRsrcs.getThreadPool().runInThread(shell); } } |
- 先獲取執行緒池中的可用執行緒數量(若沒有可用的會阻塞,直到有可用的);
- 獲取30m內要執行的trigger(即acquireNextTriggers):
獲取trigger的鎖,通過select …for update方式實現;獲取30m內(可配置)要執行的triggers(需要保證叢集節點的時間一致),若@ConcurrentExectionDisallowed且列表存在該條trigger則跳過,否則更新trigger狀態為ACQUIRED(剛開始為WAITING);插入firedTrigger表,狀態為ACQUIRED;(注意:在RAMJobStore中,有個timeTriggers,排序方式是按觸發時間nextFireTime排的;JobStoreSupport從資料庫取出triggers時是按照nextFireTime排序); - 等待直到獲取的trigger中最先執行的trigger在2ms內;
- triggersFired:
1)更新firedTrigger的status=EXECUTING;
2)更新trigger下一次觸發的時間;
3)更新trigger的狀態:無狀態的trigger->WAITING,有狀態的trigger->BLOCKED,若nextFireTime==null ->COMPLETE;
4) commit connection,釋放鎖; - 針對每個要執行的trigger,建立JobRunShell,並放入執行緒池執行:
1)execute:執行job
2)獲取TRIGGER_ACCESS鎖
3)若是有狀態的job:更新trigger狀態:BLOCKED->WAITING,PAUSED_BLOCKED->BLOCKED
4)若@PersistJobDataAfterExecution,則updateJobData
5)刪除firedTrigger
6)commit connection,釋放鎖
執行緒執行流程如下圖所示:
QuartzSchedulerThread時序圖
任務排程執行過程中,trigger的狀態變化如下圖所示:
該圖來自參考文獻5
5.2.misfireHandler執行緒
下面這些原因可能造成 misfired job:
- 系統因為某些原因被重啟。在系統關閉到重新啟動之間的一段時間裡,可能有些任務會被 misfire;
- Trigger 被暫停(suspend)的一段時間裡,有些任務可能會被 misfire;
- 執行緒池中所有執行緒都被佔用,導致任務無法被觸發執行,造成 misfire;
- 有狀態任務在下次觸發時間到達時,上次執行還沒有結束;為了處理 misfired job,Quartz 中為 trigger 定義了處理策略,主要有下面兩種:MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:針對 misfired job 馬上執行一次;MISFIRE_INSTRUCTION_DO_NOTHING:忽略 misfired job,等待下次觸發;預設是MISFIRE_INSTRUCTION_SMART_POLICY,該策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW執行緒預設1分鐘執行一次;在一個事務中,預設一次最多recovery 20個;
執行流程:
- 若配置(預設為true,可配置)成獲取鎖前先檢查是否有需要recovery的trigger,先獲取misfireCount;
- 獲取TRIGGER_ACCESS鎖;
- hasMisfiredTriggersInState:獲取misfired的trigger,預設一個事務裡只能最大20個misfired trigger(可配置),misfired判斷依據:status=waiting,next_fire_time < current_time-misfirethreshold(可配置,預設1min)
- notifyTriggerListenersMisfired
- updateAfterMisfire:獲取misfire策略(預設是MISFIRE_INSTRUCTION_SMART_POLICY,該策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根據策略更新nextFireTime;
- 將nextFireTime等更新到trigger表;
- commit connection,釋放鎖8.如果還有更多的misfired,sleep短暫時間(為了叢集負載均衡),否則sleep misfirethreshold時間,後繼續輪詢;
misfireHandler執行緒執行流程如下圖所示:
misfireHandler執行緒時序圖
5.3.clusterManager執行緒
初始化:
failedInstance=failed+self+firedTrigger表中的schedulerName在scheduler_state表中找不到的(孤兒)
執行緒執行:
每個伺服器會定時(org.quartz.jobStore.clusterCheckinInterval這個時間)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME,若這個欄位遠遠超出了該更新的時間,則認為該伺服器例項掛了;
注意:每個伺服器例項有唯一的id,若配置為AUTO,則為hostname+current_time
執行緒執行的具體流程:
- 檢查是否有超時的例項failedInstances;
- 更新該伺服器例項的LAST_CHECKIN_TIME;
若有超時的例項: - 獲取STATE_ACCESS鎖;
- 獲取超時的例項failedInstances;
- 獲取TRIGGER_ACCESS鎖;
- clusterRecover:
- 針對每個failedInstances,通過instanceId獲取每個例項的firedTriggers;
- 針對每個firedTrigger:
1) 更新trigger狀態:
BLOCKED->WAITING
PAUSED_BLOCKED->PAUSED
ACQUIRED->WAITING
2) 若firedTrigger不是ACQUIRED狀態(在執行狀態),且jobRequestRecovery=true:
建立一個SimpleTrigger,儲存到trigger表,status=waiting,MISFIRE_INSTR=MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY.
3) 刪除firedTrigger
5.4原始碼分析鎖
目前程式碼中行鎖只用到了STATE_ACCESS 和TRIGGER_ACCESS 這兩種。
1、TRIGGER_ACCESS
先了解一篇文章,通過原始碼來分析quartz是如何通過加鎖來實現叢集環境,觸發器狀態的一致性。
http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
可以看到觸發器的操作主要用主執行緒StdScheduleThread來完成,不管是獲取需要觸發的30S內的觸發器,還是觸發過程。select和update觸發器表時
都會先加鎖,後解鎖。如果資料庫資源競爭比較大的話,鎖會影響整個效能。可以考慮將任務資訊放在分散式記憶體,如redis上進行處理。資料庫只是定時從redis上load資料下來做統計。
實現都在JobStoreSupport類
加鎖型別 | 加鎖方法 | 底層資料庫操作 | 備註 |
executeInNonManagedTXLock | acquireNextTrigger | selectTriggerToAcquire selectTrigger selectJobDetail insertFiredTrigger |
查詢需要點火的trigger 選擇需要執行的trigger加入到fired_trigger表 |
for執行 triggerFired | selectJobDetail selectCalendar updateFiredTrigger triggerExists updateTrigger |
點火trigger 修改trigger狀態為可執行狀態。 |
|
recoverJobs | updateTriggerStatesFromOtherStates hasMisfiredTriggersInState doUpdateOfMisfiredTrigger selectTriggersForRecoveringJobs selectTriggersInState deleteFiredTriggers |
非叢集環境下重新執行 failed與misfired的trigger |
|
retryExecuteInNonManagedTXLock | releaseAcquiredTrigger | updateTriggerStateFromOtherState deleteFiredTrigger |
異常情況下重新釋放trigger到初使狀態。 |
triggeredJobComplete | selectTriggerStatus removeTrigger updateTriggerState deleteFiredTrigger |
觸發JOB任務完成後的處理。 | |
obtainLock | recoverMisfiredJobs | hasMisfiredTriggersInState doUpdateOfMisfiredTrigger | 重新執行misfired的trigger 可以在啟動時執行,也可以由misfired執行緒定期執行。 |
clusterRecover | selectInstancesFiredTriggerRecords updateTriggerStatesForJobFromOtherState storeTrigger deleteFiredTriggers selectFiredTriggerRecords removeTrigger deleteSchedulerState |
叢集有結點faied,讓JOB能重新執行。 | |
executeInLock 資料庫叢集裡等同於 executeInNonManagedTXLock |
storeJobAndTrigger | updateJobDetail insertJobDetail triggerExists selectJobDetail updateTrigger insertTrigger |
儲存JOB和TRIGGER配置 |
storeJob | 儲存JOB | ||
removeJob | 刪除JOB | ||
removeJobs | 批量刪除JOB | ||
removeTriggers | 批量刪除triggers | ||
storeJobsAndTriggers | 儲存JOB和多個trigger配置 | ||
removeTrigger | 刪除trigger | ||
replaceTrigger | 替換trigger | ||
storeCalendar | 儲存定時日期 | ||
removeCalendar | 刪除定時日期 | ||
clearAllSchedulingData | 清除所有定時資料 | ||
pauseTrigger | 停止觸發器 | ||
pauseJob | 停止任務 | ||
pauseJobs | 批量停止任務 | ||
resumeTrigger | 恢復觸發器 | ||
resumeJob | 恢復任務 | ||
resumeJobs | 批量恢復任務 | ||
pauseTriggers | 批量停止觸發器 | ||
resumeTriggers | 批量恢復觸發器 | ||
pauseAll | 停止所有 | ||
resumeAll | 恢復所 |
2、STATE_TRIGGER
實現都在JobStoreSupport類
加鎖型別 | 加鎖方法 | 底層資料庫操作 | 備註 |
obtainLock | doCheckin | clusterCheckIn | 判斷叢集狀態 先用LOCK_STATE_ACCESS鎖叢集狀態 再用LOCK_TRIGGER_ACCESS恢復叢集執行 |
---
六、參考資料
- Quartz Documentation http://quartz-scheduler.org/documentation
- spring javadoc-api http://docs.spring.io/spring/docs/4.3.0.BUILD-SNAPSHOT/javadoc-api/
- 基於Quartz開發企業級任務排程應用 https://www.ibm.com/developerworks/cn/opensource/os-cn-quartz/
- quartz應用與叢集原理分析 http://tech.meituan.com/mt-crm-quartz.html
- quartz詳解2:quartz由淺入深 http://ecmcug.itpub.net/11627468/viewspace-1763498/
- quartz詳解4:quartz執行緒管理 http://blog.itpub.net/11627468/viewspace-1766967/
- quartz學習筆記 http://www.cnblogs.com/yunxuange/archive/2012/08/28/2660141.html
- quartz叢集排程機制調研及原始碼分析 http://demo.netfoucs.com/gklifg/article/details/27090179