ScheduledThreadPoolExecutor中定時週期任務的實現原始碼分析
ScheduledThreadPoolExecutor是一個定時任務執行緒池,相比於ThreadPoolExecutor最大的不同在於其阻塞佇列的實現
首先看一下其構造方法:
1 public ScheduledThreadPoolExecutor(int corePoolSize, 2 ThreadFactory threadFactory, 3 RejectedExecutionHandler handler) { 4 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 5 new DelayedWorkQueue(), threadFactory, handler); 6 }
ScheduledThreadPoolExecutor是繼承自ThreadPoolExecutor的,可以看到這裡實際上呼叫了ThreadPoolExecutor的構造方法,而最大的不同在於這裡使用了預設的DelayedWorkQueue“阻塞佇列”,這是後續能夠實現定時任務的關鍵
在ScheduledThreadPoolExecutor中使用scheduleWithFixedDelay或者scheduleAtFixedRate方法來完成定時週期任務
以scheduleWithFixedDelay為例
scheduleWithFixedDelay方法:
1 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 2 long initialDelay, 3 long delay, 4 TimeUnit unit) { 5 if (command == null || unit == null) 6 throw new NullPointerException(); 7 if (delay <= 0) 8 throw new IllegalArgumentException(); 9 ScheduledFutureTask<Void> sft = 10 new ScheduledFutureTask<Void>(command, 11 null, 12 triggerTime(initialDelay, unit), 13 unit.toNanos(-delay)); 14 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 15 sft.outerTask = t; 16 delayedExecute(t); 17 return t; 18 }
這裡首先會將我們的任務包裝成ScheduledFutureTask
(這裡的delay在傳入ScheduledFutureTask的構造方法時變為了負的,這是和scheduleAtFixedRate方法唯一不一樣的地方)
ScheduledFutureTask方法:
1 private void delayedExecute(RunnableScheduledFuture<?> task) { 2 if (isShutdown()) 3 reject(task); 4 else { 5 super.getQueue().add(task); 6 if (isShutdown() && 7 !canRunInCurrentRunState(task.isPeriodic()) && 8 remove(task)) 9 task.cancel(false); 10 else 11 ensurePrestart(); 12 } 13 }
這裡不同於ThreadPoolExecutor中的處理,並沒有考慮coreSize和maxSize和任務之間的關係,而是直接將任務提交到阻塞佇列DelayedWorkQueue中
DelayedWorkQueue的add方法:
1 public boolean add(Runnable e) { 2 return offer(e); 3 } 4 5 public boolean offer(Runnable x) { 6 if (x == null) 7 throw new NullPointerException(); 8 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 9 final ReentrantLock lock = this.lock; 10 lock.lock(); 11 try { 12 int i = size; 13 if (i >= queue.length) 14 grow(); 15 size = i + 1; 16 if (i == 0) { 17 queue[0] = e; 18 setIndex(e, 0); 19 } else { 20 siftUp(i, e); 21 } 22 if (queue[0] == e) { 23 leader = null; 24 available.signal(); 25 } 26 } finally { 27 lock.unlock(); 28 } 29 return true; 30 }
實際上呼叫了offer方法,從這裡就可以看出這個“阻塞佇列”的不同之處
DelayedWorkQueue中有這些成員:
1 private static final int INITIAL_CAPACITY = 16; 2 private RunnableScheduledFuture<?>[] queue = 3 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 4 private int size = 0; 5 private Thread leader = null;
在DelayedWorkQueue內部維護的是queue這個初始大小16的陣列,其實就是一個小根堆
回到offer方法
由於是在多執行緒環境,這裡的操作使用了重入鎖保證原子性
若是在size大於陣列的長度情況下,就需要呼叫grow方法來擴容
grow方法:
1 private void grow() { 2 int oldCapacity = queue.length; 3 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 4 if (newCapacity < 0) // overflow 5 newCapacity = Integer.MAX_VALUE; 6 queue = Arrays.copyOf(queue, newCapacity); 7 }
可以看到這是一個非常簡單的擴容機制,申請一個1.5倍大小的新陣列,再將原來的資料copy上去
回到offer方法,在調整完容量後,就需要進行資料的插入,使其形成一個小根堆
可以看到,在if-else判斷中,首先檢查是不是第一個元素,若是第一個,則直接放入陣列,同時呼叫
setIndex方法,和任務關聯
setIndex方法:
1 private void setIndex(RunnableScheduledFuture<?> f, int idx) { 2 if (f instanceof ScheduledFutureTask) 3 ((ScheduledFutureTask)f).heapIndex = idx; 4 }
這個方法很簡單,將下標關聯到之前包裝好的任務ScheduledFutureTask中
若不是第一個元素,則需要呼叫siftUp,進行小根堆的調整
siftUp方法:
1 private void siftUp(int k, RunnableScheduledFuture<?> key) { 2 while (k > 0) { 3 int parent = (k - 1) >>> 1; 4 RunnableScheduledFuture<?> e = queue[parent]; 5 if (key.compareTo(e) >= 0) 6 break; 7 queue[k] = e; 8 setIndex(e, k); 9 k = parent; 10 } 11 queue[k] = key; 12 setIndex(key, k); 13 }
因為小根堆實際上就是一個二叉樹,利用二叉樹的性質根據當前要插入節點的下標,得到其父節點的下標parent ,再和父節點的RunnableScheduledFuture物件進行compareTo的比較(RunnableScheduledFuture繼承了Comparable介面)
compareTo的實現:
1 public int compareTo(Delayed other) { 2 if (other == this) // compare zero if same object 3 return 0; 4 if (other instanceof ScheduledFutureTask) { 5 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 6 long diff = time - x.time; 7 if (diff < 0) 8 return -1; 9 else if (diff > 0) 10 return 1; 11 else if (sequenceNumber < x.sequenceNumber) 12 return -1; 13 else 14 return 1; 15 } 16 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 17 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 18 }
這裡的邏輯比較簡單,只需要看第二個if
在前面ScheduledFutureTask包裝我們的任務的時候,其構造方法如下:
1 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 2 super(r, result); 3 this.time = ns; 4 this.period = period; 5 this.sequenceNumber = sequencer.getAndIncrement(); 6 }
這裡的time 也就是initialDelay,period 就是-delay,sequenceNumber 是一個全域性自增的序列號
那麼在上面的compareTo方法中,就首先根據子節點的initialDelay和父節點的initialDelay比較
若是子節點小於父節點,返回-1,子節點大於父節點返回1
若是相等,則根據序列號比較,序列號小的返回-1
回到siftUp方法,通過compareTo方法,若是大於等於0,就說明子節點大於父節點,不需要做調整,結束迴圈
若是小於0,說明子節點小於父節點,那麼就需要將父節點先交換到當前位置,再將k變成parent,在下一次迴圈時,就會找parent的parent,重複上述操作,直至構成小根堆
最後將要插入的節點放入queue中合適的位置
那麼在後續的任務新增中,就會根據任務的initialDelay,以及建立時間,構建一個小根堆
回到offer方法,在小根堆中插入完節點後,若是第一次插入, 將leader(Thread物件)置為null,利用available(Condition物件)喚醒Lock 的AQS上的阻塞
DelayedWorkQueue的add到此結束,回到delayedExecute方法中,在完成向阻塞佇列新增任務後,發現執行緒池中並沒有一個worker在工作,接下來的工作就由ThreadPoolExecutor的ensurePrestart方法實現:
1 void ensurePrestart() { 2 int wc = workerCountOf(ctl.get()); 3 if (wc < corePoolSize) 4 addWorker(null, true); 5 else if (wc == 0) 6 addWorker(null, false); 7 }
可以看到這裡根據ctl的取值,與corePoolSize比較,呼叫了執行緒池的addWorker方法,那麼實際上也就是通過這裡開啟了執行緒池的worker來進行工作
來看看在worker的輪詢中發生了什麼:
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); // allow interrupts 6 boolean completedAbruptly = true; 7 try { 8 while (task != null || (task = getTask()) != null) { 9 w.lock(); 10 // If pool is stopping, ensure thread is interrupted; 11 // if not, ensure thread is not interrupted. This 12 // requires a recheck in second case to deal with 13 // shutdownNow race while clearing interrupt 14 if ((runStateAtLeast(ctl.get(), STOP) || 15 (Thread.interrupted() && 16 runStateAtLeast(ctl.get(), STOP))) && 17 !wt.isInterrupted()) 18 wt.interrupt(); 19 try { 20 beforeExecute(wt, task); 21 Throwable thrown = null; 22 try { 23 task.run(); 24 } catch (RuntimeException x) { 25 thrown = x; throw x; 26 } catch (Error x) { 27 thrown = x; throw x; 28 } catch (Throwable x) { 29 thrown = x; throw new Error(x); 30 } finally { 31 afterExecute(task, thrown); 32 } 33 } finally { 34 task = null; 35 w.completedTasks++; 36 w.unlock(); 37 } 38 } 39 completedAbruptly = false; 40 } finally { 41 processWorkerExit(w, completedAbruptly); 42 } 43 }
可以看到在ThreadPoolExecutor的worker輪詢執行緒中,會通過getTask方法,不斷地從阻塞佇列中獲取任務
getTask方法:
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // Check if queue empty only if necessary. 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 // Are workers subject to culling? 17 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 18 19 if ((wc > maximumPoolSize || (timed && timedOut)) 20 && (wc > 1 || workQueue.isEmpty())) { 21 if (compareAndDecrementWorkerCount(c)) 22 return null; 23 continue; 24 } 25 26 try { 27 Runnable r = timed ? 28 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 29 workQueue.take(); 30 if (r != null) 31 return r; 32 timedOut = true; 33 } catch (InterruptedException retry) { 34 timedOut = false; 35 } 36 } 37 }
可以看到在這個方法中,在一系列的引數檢查並設定完畢後,會通過workQueue的poll或者take方法來獲取所需的任務
其中poll方法是在設定了超時時間的情況下進行獲取,take則不帶有超時時間
以take為例
DelayedWorkQueue的take方法:
1 public RunnableScheduledFuture<?> take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 for (;;) { 6 RunnableScheduledFuture<?> first = queue[0]; 7 if (first == null) 8 available.await(); 9 else { 10 long delay = first.getDelay(NANOSECONDS); 11 if (delay <= 0) 12 return finishPoll(first); 13 first = null; // don't retain ref while waiting 14 if (leader != null) 15 available.await(); 16 else { 17 Thread thisThread = Thread.currentThread(); 18 leader = thisThread; 19 try { 20 available.awaitNanos(delay); 21 } finally { 22 if (leader == thisThread) 23 leader = null; 24 } 25 } 26 } 27 } 28 } finally { 29 if (leader == null && queue[0] != null) 30 available.signal(); 31 lock.unlock(); 32 } 33 }
在for迴圈中首先取出陣列中的第一個元素,也就是生成的小根堆中最小的那一個
得到first後,若是first為null,則說明當前沒有可執行的任務,則使用available這個Condition物件,將AQS阻塞起來,等待下次任務建立時再通過前面提到的available喚醒阻塞
若是first存在,則通過getDelay方法獲取時間間隔
getDelay方法:
1 public long getDelay(TimeUnit unit) { 2 return unit.convert(time - now(), NANOSECONDS); 3 }
這個方法就是用time減去當前時間now,得到的一個納秒級時間差值
而time是在ScheduledFutureTask執行構造方法時,通過triggerTime方法,使用initialDelay進行計算出來的
triggerTime方法:
1 private long triggerTime(long delay, TimeUnit unit) { 2 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 3 } 4 5 long triggerTime(long delay) { 6 return now() + 7 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 8 } 9 10 private long overflowFree(long delay) { 11 Delayed head = (Delayed) super.getQueue().peek(); 12 if (head != null) { 13 long headDelay = head.getDelay(NANOSECONDS); 14 if (headDelay < 0 && (delay - headDelay < 0)) 15 delay = Long.MAX_VALUE + headDelay; 16 } 17 return delay; 18 }
可以看到time在這裡實際上就是通過initialDelay加上當時設定的納秒級時間組成的
其中overflowFree是為了防止Long型別的溢位做了一次計算,後邊再說
所以take方法中,通過getDelay方法得到的是一個時間差,若是時間差小於等於0,則說明任務到了該執行的時候了,此時呼叫finishPoll
finishPoll方法:
1 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 2 int s = --size; 3 RunnableScheduledFuture<?> x = queue[s]; 4 queue[s] = null; 5 if (s != 0) 6 siftDown(0, x); 7 setIndex(f, -1); 8 return f; 9 }
這個方法的邏輯還是比較簡單的,就是一個簡單的小根堆重新調整的操作,由於f需要被取出,此時利用最後一個元素,完成一次自上向下的調整(生成時是自下向上)
siftDown方法和siftUp類似:
1 private void siftDown(int k, RunnableScheduledFuture<?> key) { 2 int half = size >>> 1; 3 while (k < half) { 4 int child = (k << 1) + 1; 5 RunnableScheduledFuture<?> c = queue[child]; 6 int right = child + 1; 7 if (right < size && c.compareTo(queue[right]) > 0) 8 c = queue[child = right]; 9 if (key.compareTo(c) <= 0) 10 break; 11 queue[k] = c; 12 setIndex(c, k); 13 k = child; 14 } 15 queue[k] = key; 16 setIndex(key, k); 17 }
由二叉樹性質half 保證只操作到倒數第二層
在迴圈中,首先根據k(當前也就是根節點),得到其左右孩子的下標
若是右孩子存在,那麼就用左孩子和右孩子比較,選出最下的哪一個作為child
若是右孩子不存在,則直接使用左孩子作為child
當選出child後,再和待插入的元素key比較
若是key小,則結束迴圈,直接將key插入k所在位置
若不是,則將當前child所在元素放在k所在位置,然後從child位置繼續開始向下尋找,直到找到一個大於key或者遍歷完畢
這樣自上向下的將當前堆又調整成了小根堆,以後的定時週期任務都是以這種方式來呼叫的
看到這ScheduledThreadPoolExecutor的定時週期任務已經基本理解了,只不過還存在一個問題,當執行週期任務,會從小根堆取出,那麼該任務下一次的執行時間何時更新到小根堆?
回到ThreadPoolExecutor的worker的runWorker方法中,在呼叫完getTask方法後,在進行完一系列完全檢查後,會直接呼叫task的run方法,而此時的task是經過之前ScheduledFutureTask包裝的
ScheduledFutureTask的run方法:
1 public void run() { 2 boolean periodic = isPeriodic(); 3 if (!canRunInCurrentRunState(periodic)) 4 cancel(false); 5 else if (!periodic) 6 ScheduledFutureTask.super.run(); 7 else if (ScheduledFutureTask.super.runAndReset()) { 8 setNextRunTime(); 9 reExecutePeriodic(outerTask); 10 } 11 }
若是設定了週期任務(period不為0),那麼isPeriodic方法為true
邏輯上就會執行runAndReset方法,這個方法內部就會呼叫我們傳入的Runnable的run方法,從而真正地執行我們的任務
在執行完畢後,可以看到呼叫了setNextRunTime方法
setNextRunTime方法:
1 private void setNextRunTime() { 2 long p = period; 3 if (p > 0) 4 time += p; 5 else 6 time = triggerTime(-p); 7 }
這裡就很簡單,利用當前time和period計算出下一次的time
由於scheduleWithFixedDelay和scheduleAtFixedRate之前所說的不一樣之處,在這裡就得到了體現
因為scheduleAtFixedRate的period是大於0的,所以scheduleAtFixedRate計算出來的時間間隔就是initialDelay + n*period的這種形式,那麼其執行就會有固定的時間點,不過這還是要取決於任務的執行時間,若是任務的執行時間大於時間間隔,那麼當上一次任務執行完畢,就會立刻執行,而不是等到時間點到了,若是任務的執行時間小於時間間隔,那麼毫無疑問就需要等到時間點到了才執行下一次的任務
由於scheduleWithFixedDelay的period是小於0的,所以需要執行triggerTime
triggerTime方法:
1 long triggerTime(long delay) { 2 return now() + 3 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 4 }
可以看到若是不存在Long型別的溢位問題,那麼下一次的時間就等於當前時間加時間間隔,所以說scheduleWithFixedDelay的不同之處在於其算上了任務的實際執行時間
若是存在Long型別的溢位問題時
在overflowFree中:
1 private long overflowFree(long delay) { 2 Delayed head = (Delayed) super.getQueue().peek(); 3 if (head != null) { 4 long headDelay = head.getDelay(NANOSECONDS); 5 if (headDelay < 0 && (delay - headDelay < 0)) 6 delay = Long.MAX_VALUE + headDelay; 7 } 8 return delay; 9 }
首先通過peek得到佇列中的第一個元素,若是不存在,則直接返回delay
若是存在,通過getDelay得到headDelay
這裡就會存在兩情況
任務還沒達到執行時間,則headDelay 大於零
任務達到執行時間,但卻由於之前的任務還沒執行完畢,遭到了延時,headDelay 小於0
所以這次的計算就是將headDelay這部分超時時間減去,以防止後續影響compareTo的比較,從而引起offer順序的錯誤
(只不過這種情況正常不會遇見。。。)
在計算完成下一次的執行時間後
呼叫reExecutePeriodic方法:
1 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 2 if (canRunInCurrentRunState(true)) { 3 super.getQueue().add(task); 4 if (!canRunInCurrentRunState(true) && remove(task)) 5 task.cancel(false); 6 else 7 ensurePrestart(); 8 } 9 }
其中傳入的這個task(outerTask)其實就是當前執行完畢的這個任務,
可以看到這裡canRunInCurrentRunState成立的情況下,就會通過
getQueue得到阻塞佇列,再次通過DelayedWorkQueue的add方法將其加入到小根堆中,只不過這時的time發生了變化
若是情況正常,則繼續通過ThreadPoolExecutor的ensurePrestart方法,排程worker的工作
這樣定時週期任務就能正常執行
ScheduledThreadPoolExecutor分析到此