JDK 原始碼解析 —— CyclicBarrier
阿新 • • 發佈:2019-01-24
一. 簡介
CyclicBarrier 是一個讓一系列執行緒集合互相等待直到一個公共屏障點(barrier point)的同步輔助工具。這個屏障被稱為迴圈屏障,是因為它可以在等待執行緒釋放後被重用。
CyclicBarrier 支援一個可選的 Runnable 命令,在最後一個執行緒到達後執行一次 Runnable 命令。
二. 簡單使用示例
三. CyclicBarrier 作用圖示 讓所有執行緒都執行到同一個點(屏障點)後,再繼續執行
四. 程式碼解析
五. 總結 CyclicBarrier 是利用了 Condition 介面,定義了一個叫做 trip 的 Condition,當所有執行緒到達後執行緒才能從 Condition 佇列中移到 AQS 的等待佇列繼續執行。關於 Condition,可以參考博主的另一篇博文:http://blog.csdn.net/wenniuwuren/article/details/51447767 六. 參考資料 JDK 7 原始碼
CyclicBarrier(3) 等到 3 個執行緒都到了,這個物件還可以重用,而 CountDownLatch 則不能重用,從 Cyclic 名字就可以看出這個類物件可以迴圈使用
public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3);//建立CyclicBarrier物件並設定3個公共屏障點 for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("執行緒" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await();//到此如果沒有達到公共屏障點,則該執行緒處於等待狀態,如果達到公共屏障點則所有處於等待的執行緒都繼續往下執行 Thread.sleep((long)(Math.random()*10000)); System.out.println("執行緒" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("執行緒" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
三. CyclicBarrier 作用圖示 讓所有執行緒都執行到同一個點(屏障點)後,再繼續執行
四. 程式碼解析
- 重要變數
// 每次對柵欄的使用可以表現為一個 generation 例項。當條件 trip 改變或者重置 generation 也會 // 隨之改變。可以有多個 generation 和使用柵欄的執行緒關聯,但是隻有一個可以獲得鎖。 private static class Generation { boolean broken = false; } /** 守護柵欄入口的鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 等待條件,直到所有執行緒到達柵欄 */ private final Condition trip = lock.newCondition(); /** 要屏障的執行緒數 */ private final int parties; /* 當執行緒都到達柵欄,執行的 Runnable */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); //還要等待多少個執行緒到達。執行緒到達屏障點就減去 1。 //每次新建 generation 的時候或者屏障 broken,count重新設定為 parties 引數值 private int count;
- await() 方法:等待到所有參與的執行緒都到達屏障點。如果當前執行緒不是最後一個到達的,當前執行緒停止執行,進入睡眠,直到以下幾種情況發生
- 最後的執行緒到達
- 其他執行緒中斷當前執行緒
- 其他執行緒中斷中斷等待執行緒中的一條
- 在等待所有執行緒到達屏障前有執行緒超時
- 其他執行緒在此屏障中呼叫 reset(將屏障設定為初始狀態)
- 設定了中斷狀態
- 在等待時中斷
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();// 加了鎖,以下操作為執行緒安全操作 try { final Generation g = generation; if (g.broken) // 如果屏障狀態 broken,則丟擲屏障 broken 異常 throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped 說明是最後一個到達的執行緒 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) // 如果有 Runnable,先執行 command.run(); ranAction = true; nextGeneration();// 喚醒 Condition 佇列的所有執行緒,既然是 Cyclic 的,所以也會重置狀態以便重用屏障,這是和 CountDownLatch 的區別 return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) {// 如果不是最後一個到達的執行緒,就進入迴圈等待 try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
五. 總結 CyclicBarrier 是利用了 Condition 介面,定義了一個叫做 trip 的 Condition,當所有執行緒到達後執行緒才能從 Condition 佇列中移到 AQS 的等待佇列繼續執行。關於 Condition,可以參考博主的另一篇博文:http://blog.csdn.net/wenniuwuren/article/details/51447767 六. 參考資料 JDK 7 原始碼