Quartz 分散式解決方案
本文要點
- 1.Quartz相關重要概念
- 2.如何實現分散式排程
- 3.核心原始碼的實現
1.Quartz相關重要概念
1.1 scheduler:: 任務排程器
如圖所示:
image.png
Scheduler 有三種:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。
- RemoteMBeanScheduler:
- RemoteScheduler:
- StdScheduler:
1.2 trigger: 觸發器,用於定義任務排程時間規則
image.pngTrigger 主要有三種:SimpleTrigger, CronTrigger,CalendarIntervalTrigger
SimpleTrigger:支援簡單的頻率呼叫的Trigger
CronTrigger:支援cron表示式的Trigger
CalendarIntervalTrigger:支援基於日曆週期的Trigger
1.3 job:: 任務,即被排程的任務
主要有兩種型別的 job:無狀態的(stateless)和有狀態的(stateful)。
對於同一個 trigger 來說,有狀態(新增@DisallowConcurrentExecution註解
)的 job 不能被並行執行,只有上一次觸發的任務被執行完之後,才能觸發下一次執行。一個 job 可以被多個 trigger 關聯,但是一個 trigger 只能關聯一個 job。
1.4 misfire::錯過的,指本來應該被執行但實際沒有被執行的任務排程
-
系統因為某些原因被重啟。在系統關閉到重新啟動之間的一段時間裡,可能有些任務會被 misfire;
-
Trigger 被暫停(suspend)的一段時間裡,有些任務可能會被 misfire;
-
執行緒池中所有執行緒都被佔用,導致任務無法被觸發執行,造成 misfire;
-
有狀態任務在下次觸發時間到達時,上次執行還沒有結束;
2.Quartz如何實現分散式排程
3.核心原始碼的實現
Quartz是基於DB鎖來實現的分散式排程。
首先要明確Trigger有幾個狀態: WAITING(等待)、ACQUIRED(獲取)、EXECUTING(執行)、COMPLETE(完成)、BLOCKED(阻塞)、ERROR(報錯)、PAUSED(暫停)、PAUSED_BLOCKED(暫停阻塞)、DELETED(刪除)
具體看下org.quartz.core.QuartzSchedulerThread
核心程式碼的具體實現
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
/**
* 預設沒加鎖
*
* 1.查詢 TRIGGERS 狀態是WAITING 的trigger
*
* 2.更新 TRIGGERS WAITING---->ACQUIRED
*
* 3.插入FIRED_TRIGGERS表
*/
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime()