1. 程式人生 > 其它 >Quartz 分散式解決方案

Quartz 分散式解決方案

本文要點

  • 1.Quartz相關重要概念
  • 2.如何實現分散式排程
  • 3.核心原始碼的實現

1.Quartz相關重要概念

1.1 scheduler:: 任務排程器

如圖所示:


  image.png

Scheduler 有三種:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。

  • RemoteMBeanScheduler:
  • RemoteScheduler:
  • StdScheduler:
1.2 trigger: 觸發器,用於定義任務排程時間規則
  image.png

Trigger 主要有三種:SimpleTrigger, CronTrigger,CalendarIntervalTrigger
SimpleTrigger:支援簡單的頻率呼叫的Trigger
CronTrigger:支援cron表示式的Trigger
CalendarIntervalTrigger:支援基於日曆週期的Trigger

1.3 job:: 任務,即被排程的任務

主要有兩種型別的 job:無狀態的(stateless)和有狀態的(stateful)。
對於同一個 trigger 來說,有狀態(新增@DisallowConcurrentExecution註解)的 job 不能被並行執行,只有上一次觸發的任務被執行完之後,才能觸發下一次執行。
一個 job 可以被多個 trigger 關聯,但是一個 trigger 只能關聯一個 job。

1.4 misfire::錯過的,指本來應該被執行但實際沒有被執行的任務排程
  • 系統因為某些原因被重啟。在系統關閉到重新啟動之間的一段時間裡,可能有些任務會被 misfire;

  • Trigger 被暫停(suspend)的一段時間裡,有些任務可能會被 misfire;

  • 執行緒池中所有執行緒都被佔用,導致任務無法被觸發執行,造成 misfire;

  • 有狀態任務在下次觸發時間到達時,上次執行還沒有結束;

2.Quartz如何實現分散式排程

具體配置參考官網連結
http://www.quartz-scheduler.org/documentation/quartz-2.3.0/configuration/ConfigJDBCJobStoreClustering.html

3.核心原始碼的實現

Quartz是基於DB鎖來實現的分散式排程。
首先要明確Trigger有幾個狀態: WAITING(等待)、ACQUIRED(獲取)、EXECUTING(執行)、COMPLETE(完成)、BLOCKED(阻塞)、ERROR(報錯)、PAUSED(暫停)、PAUSED_BLOCKED(暫停阻塞)、DELETED(刪除)

具體看下org.quartz.core.QuartzSchedulerThread核心程式碼的具體實現

 public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers = null;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                         
                        /**
                         * 預設沒加鎖
                         * 
                         * 1.查詢 TRIGGERS 狀態是WAITING 的trigger
                         * 
                         * 2.更新 TRIGGERS WAITING---->ACQUIRED
                         * 
                         * 3.插入FIRED_TRIGGERS表
                         */
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        lastAcquireFailed = false;
                        if (log.isDebugEnabled()) 
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if(!lastAcquireFailed) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        lastAcquireFailed = true;
                        continue;
                    } catch (RuntimeException e) {
                        if(!lastAcquireFailed) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        lastAcquireFailed = true;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime()