1. 程式人生 > 其它 >Java AQS原理和AQS的同步元件總結

Java AQS原理和AQS的同步元件總結

AQS 簡單介紹

AQS 的全稱為 AbstractQueuedSynchronizer ,翻譯過來的意思就是抽象佇列同步器。這個類在 java.util.concurrent.locks 包下面。

 

AQS 就是一個抽象類,主要用來構建鎖和同步器。

1 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
2 }

 

AQS 為構建鎖和同步器提供了一些通用功能的是實現,因此,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的 ReentrantLock,Semaphore,其他的諸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基於 AQS 的。

 

 

AQS 原理

AQS 原理概覽

AQS 核心思想是,如果被請求的共享資源空閒,則將當前請求資源的執行緒設定為有效的工作執行緒,並且將共享資源設定為鎖定狀態。如果被請求的共享資源被佔用,那麼就需要一套執行緒阻塞等待以及被喚醒時鎖分配的機制,這個機制 AQS 是用 CLH 佇列鎖實現的,即將暫時獲取不到鎖的執行緒加入到佇列中。

CLH(Craig,Landin,and Hagersten)佇列是一個虛擬的雙向佇列(虛擬的雙向佇列即不存在佇列例項,僅存在結點之間的關聯關係)。AQS 是將每條請求共享資源的執行緒封裝成一個 CLH 鎖佇列的一個結點(Node)來實現鎖的分配。

看個 AQS(AbstractQueuedSynchronizer

)原理圖:

 

AQS 使用一個 int 成員變數來表示同步狀態,通過內建的 FIFO 佇列來完成獲取資源執行緒的排隊工作。AQS 使用 CAS 對該同步狀態進行原子操作實現對其值的修改。

1 private volatile int state;//共享變數,使用volatile修飾保證執行緒可見性

 

狀態資訊通過 protected 型別的getState()setState()compareAndSetState() 進行操作

 1 /返回同步狀態的當前值
 2 protected final int getState() {
 3         return state;
 4
} 5 // 設定同步狀態的值 6 protected final void setState(int newState) { 7 state = newState; 8 } 9 //原子地(CAS操作)將同步狀態值設定為給定值update如果當前同步狀態的值等於expect(期望值) 10 protected final boolean compareAndSetState(int expect, int update) { 11 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 12 }

 

AQS 對資源的共享方式

AQS 定義兩種資源共享方式

1) Exclusive(獨佔)

只有一個執行緒能執行,如 ReentrantLock。又可分為公平鎖和非公平鎖,ReentrantLock 同時支援兩種鎖,下面以 ReentrantLock 對這兩種鎖的定義做介紹:

  • 公平鎖 :按照執行緒在佇列中的排隊順序,先到者先拿到鎖
  • 非公平鎖 :當執行緒要獲取鎖時,先通過兩次 CAS 操作去搶鎖,如果沒搶到,當前執行緒再加入到佇列中等待喚醒。

 

下面來看 ReentrantLock 中相關的原始碼:

ReentrantLock 預設採用非公平鎖,因為考慮獲得更好的效能,通過 boolean 來決定是否用公平鎖(傳入 true 用公平鎖)

1 /** Synchronizer providing all implementation mechanics */
2 private final Sync sync;
3 public ReentrantLock() {
4     // 預設非公平鎖
5     sync = new NonfairSync();
6 }
7 public ReentrantLock(boolean fair) {
8     sync = fair ? new FairSync() : new NonfairSync();
9 }

 

ReentrantLock 中公平鎖的 lock 方法

 1 static final class FairSync extends Sync {
 2     final void lock() {
 3         acquire(1);
 4     }
 5     // AbstractQueuedSynchronizer.acquire(int arg)
 6     public final void acquire(int arg) {
 7         if (!tryAcquire(arg) &&
 8             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 9             selfInterrupt();
10     }
11     protected final boolean tryAcquire(int acquires) {
12         final Thread current = Thread.currentThread();
13         int c = getState();
14         if (c == 0) {
15             // 1. 和非公平鎖相比,這裡多了一個判斷:是否有執行緒在等待
16             if (!hasQueuedPredecessors() &&
17                 compareAndSetState(0, acquires)) {
18                 setExclusiveOwnerThread(current);
19                 return true;
20             }
21         }
22         else if (current == getExclusiveOwnerThread()) {
23             int nextc = c + acquires;
24             if (nextc < 0)
25                 throw new Error("Maximum lock count exceeded");
26             setState(nextc);
27             return true;
28         }
29         return false;
30     }
31 }

 

非公平鎖的 lock 方法:

 1 static final class NonfairSync extends Sync {
 2     final void lock() {
 3         // 2. 和公平鎖相比,這裡會直接先進行一次CAS,成功就返回了
 4         if (compareAndSetState(0, 1))
 5             setExclusiveOwnerThread(Thread.currentThread());
 6         else
 7             acquire(1);
 8     }
 9     // AbstractQueuedSynchronizer.acquire(int arg)
10     public final void acquire(int arg) {
11         if (!tryAcquire(arg) &&
12             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
13             selfInterrupt();
14     }
15     protected final boolean tryAcquire(int acquires) {
16         return nonfairTryAcquire(acquires);
17     }
18 }
19 /**
20  * Performs non-fair tryLock.  tryAcquire is implemented in
21  * subclasses, but both need nonfair try for trylock method.
22  */
23 final boolean nonfairTryAcquire(int acquires) {
24     final Thread current = Thread.currentThread();
25     int c = getState();
26     if (c == 0) {
27         // 這裡沒有對阻塞佇列進行判斷
28         if (compareAndSetState(0, acquires)) {
29             setExclusiveOwnerThread(current);
30             return true;
31         }
32     }
33     else if (current == getExclusiveOwnerThread()) {
34         int nextc = c + acquires;
35         if (nextc < 0) // overflow
36             throw new Error("Maximum lock count exceeded");
37         setState(nextc);
38         return true;
39     }
40     return false;
41 }

 

總結:

公平鎖和非公平鎖只有兩處不同:

  1. 非公平鎖在呼叫 lock 後,首先就會呼叫 CAS 進行一次搶鎖,如果這個時候恰巧鎖沒有被佔用,那麼直接就獲取到鎖返回了。
  2. 非公平鎖在 CAS 失敗後,和公平鎖一樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,如果發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,但是公平鎖會判斷等待佇列是否有執行緒處於等待狀態,如果有則不去搶鎖,乖乖排到後面。

公平鎖和非公平鎖就這兩點區別,如果這兩次 CAS 都不成功,那麼後面非公平鎖和公平鎖是一樣的,都要進入到阻塞佇列等待喚醒。

相對來說,非公平鎖會有更好的效能,因為它的吞吐量比較大。當然,非公平鎖讓獲取鎖的時間變得更加不確定,可能會導致在阻塞佇列中的執行緒長期處於飢餓狀態。

 

2) Share(共享)

多個執行緒可同時執行,如 Semaphore/CountDownLatchSemaphoreCountDownLatCh、 CyclicBarrierReadWriteLock 我們都會在後面講到。

ReentrantReadWriteLock 可以看成是組合式,因為 ReentrantReadWriteLock 也就是讀寫鎖允許多個執行緒同時對某一資源進行讀。

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源 state 的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS 已經在上層已經幫我們實現好了。

 

AQS 底層使用了模板方法模式

同步器的設計是基於模板方法模式的,如果需要自定義同步器一般的方式是這樣(模板方法模式很經典的一個應用):

  1. 使用者繼承 AbstractQueuedSynchronizer 並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源 state 的獲取和釋放)
  2. 將 AQS 組合在自定義同步元件的實現中,並呼叫其模板方法,而這些模板方法會呼叫使用者重寫的方法。

這和我們以往通過實現介面的方式有很大區別,這是模板方法模式很經典的一個運用。

AQS 使用了模板方法模式,自定義同步器時需要重寫下面幾個 AQS 提供的鉤子方法:

1 protected boolean tryAcquire(int)//獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
2 protected boolean tryRelease(int)//獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
3 protected boolean tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
4 protected boolean tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
5 protected boolean isHeldExclusively()//該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。

 

什麼是鉤子方法呢? 鉤子方法是一種被宣告在抽象類中的方法,一般使用 protected 關鍵字修飾,它可以是空方法(由子類實現),也可以是預設實現的方法。模板設計模式通過鉤子方法控制固定步驟的實現。

篇幅問題,這裡就不詳細介紹模板方法模式了,不太瞭解的小夥伴可以看看這篇文章:用Java8 改造後的模板方法模式真的是 yyds!open in new window

除了上面提到的鉤子方法之外,AQS 類中的其他方法都是 final ,所以無法被其他類重寫。

以 ReentrantLock 為例,state 初始化為 0,表示未鎖定狀態。A 執行緒 lock() 時,會呼叫 tryAcquire() 獨佔該鎖並將 state+1 。此後,其他執行緒再 tryAcquire() 時就會失敗,直到 A 執行緒 unlock() 到 state=0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。當然,釋放鎖之前,A 執行緒自己是可以重複獲取此鎖的(state 會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多少次,這樣才能保證 state 是能回到零態的。

再以 CountDownLatch 以例,任務分為 N 個子執行緒去執行,state 也初始化為 N(注意 N 要與執行緒個數一致)。這 N 個子執行緒是並行執行的,每個子執行緒執行完後 countDown() 一次,state 會 CAS(Compare and Swap) 減 1。等到所有子執行緒都執行完後(即 state=0 ),會 unpark() 主呼叫執行緒,然後主呼叫執行緒就會從 await() 函式返回,繼續後餘動作。

一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一種即可。但 AQS 也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock

推薦兩篇 AQS 原理和相關原始碼分析的文章:

 

Semaphore(訊號量)

synchronized 和 ReentrantLock 都是一次只允許一個執行緒訪問某個資源,Semaphore(訊號量)可以指定多個執行緒同時訪問某個資源。

示例程式碼如下:

 1 /**
 2  *
 3  * @author Snailclimb
 4  * @date 2018年9月30日
 5  * @Description: 需要一次性拿一個許可的情況
 6  */
 7 public class SemaphoreExample1 {
 8   // 請求的數量
 9   private static final int threadCount = 550;
10 
11   public static void main(String[] args) throws InterruptedException {
12     // 建立一個具有固定執行緒數量的執行緒池物件(如果這裡執行緒池的執行緒數量給太少的話你會發現執行的很慢)
13     ExecutorService threadPool = Executors.newFixedThreadPool(300);
14     // 一次只能允許執行的執行緒數量。
15     final Semaphore semaphore = new Semaphore(20);
16 
17     for (int i = 0; i < threadCount; i++) {
18       final int threadnum = i;
19       threadPool.execute(() -> {// Lambda 表示式的運用
20         try {
21           semaphore.acquire();// 獲取一個許可,所以可執行執行緒數量為20/1=20
22           test(threadnum);
23           semaphore.release();// 釋放一個許可
24         } catch (InterruptedException e) {
25           // TODO Auto-generated catch block
26           e.printStackTrace();
27         }
28 
29       });
30     }
31     threadPool.shutdown();
32     System.out.println("finish");
33   }
34 
35   public static void test(int threadnum) throws InterruptedException {
36     Thread.sleep(1000);// 模擬請求的耗時操作
37     System.out.println("threadnum:" + threadnum);
38     Thread.sleep(1000);// 模擬請求的耗時操作
39   }
40 }

 

執行 acquire() 方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個 release 方法增加一個許可證,這可能會釋放一個阻塞的 acquire() 方法。然而,其實並沒有實際的許可證這個物件,Semaphore 只是維持了一個可獲得許可證的數量。 Semaphore 經常用於限制獲取某種資源的執行緒數量。

當然一次也可以一次拿取和釋放多個許可,不過一般沒有必要這樣做:

1 semaphore.acquire(5);// 獲取5個許可,所以可執行執行緒數量為20/5=4
2 test(threadnum);
3 semaphore.release(5);// 釋放5個許可

 

除了 acquire() 方法之外,另一個比較常用的與之對應的方法是 tryAcquire() 方法,該方法如果獲取不到許可就立即返回 false。

Semaphore 有兩種模式,公平模式和非公平模式。

  • 公平模式: 呼叫 acquire() 方法的順序就是獲取許可證的順序,遵循 FIFO;
  • 非公平模式: 搶佔式的。

 

Semaphore 對應的兩個構造方法如下:

1 public Semaphore(int permits) {
2         sync = new NonfairSync(permits);
3     }
4 
5 public Semaphore(int permits, boolean fair) {
6         sync = fair ? new FairSync(permits) : new NonfairSync(permits);
7     }

 

這兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,預設非公平模式。

Semaphore 與 CountDownLatch 一樣,也是共享鎖的一種實現。它預設構造 AQS 的 state 為 permits。當執行任務的執行緒數量超出 permits,那麼多餘的執行緒將會被放入阻塞佇列 Park,並自旋判斷 state 是否大於 0。只有當 state 大於 0 的時候,阻塞的執行緒才能繼續執行,此時先前執行任務的執行緒繼續執行 release() 方法,release() 方法使得 state 的變數會加 1,那麼自旋的執行緒便會判斷成功。 如此,每次只有最多不超過 permits 數量的執行緒能自旋成功,便限制了執行任務執行緒的數量。

 

CountDownLatch (倒計時器)

CountDownLatch 允許 count 個執行緒阻塞在一個地方,直至所有執行緒的任務都執行完畢。

CountDownLatch 是共享鎖的一種實現,它預設構造 AQS 的 state 值為 count。當執行緒使用 countDown() 方法時,其實使用了tryReleaseShared方法以 CAS 的操作來減少 state,直至 state 為 0 。當呼叫 await() 方法的時候,如果 state 不為 0,那就證明任務還沒有執行完畢,await() 方法就會一直阻塞,也就是說 await() 方法之後的語句不會被執行。然後,CountDownLatch 會自旋 CAS 判斷 state == 0,如果 state == 0 的話,就會釋放所有等待的執行緒,await() 方法之後的語句得到執行。

CountDownLatch 的兩種典型用法

1、某一執行緒在開始執行前等待 n 個執行緒執行完畢。

將 CountDownLatch 的計數器初始化為 n (new CountDownLatch(n)),每當一個任務執行緒執行完畢,就將計數器減 1 (countdownlatch.countDown()),當計數器的值變為 0 時,在 CountDownLatch 上 await() 的執行緒就會被喚醒。一個典型應用場景就是啟動一個服務時,主執行緒需要等待多個元件載入完畢,之後再繼續執行。

2、實現多個執行緒開始執行任務的最大並行性。

注意是並行性,不是併發,強調的是多個執行緒在某一時刻同時開始執行。類似於賽跑,將多個執行緒放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的 CountDownLatch 物件,將其計數器初始化為 1 (new CountDownLatch(1)),多個執行緒在開始執行任務前首先 coundownlatch.await(),當主執行緒呼叫 countDown() 時,計數器變為 0,多個執行緒同時被喚醒。

 

CountDownLatch 的使用示例

 1 public class CountDownLatchExample1 {
 2   // 請求的數量
 3   private static final int threadCount = 550;
 4 
 5   public static void main(String[] args) throws InterruptedException {
 6     // 建立一個具有固定執行緒數量的執行緒池物件(如果這裡執行緒池的執行緒數量給太少的話你會發現執行的很慢)
 7     ExecutorService threadPool = Executors.newFixedThreadPool(300);
 8     final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 9     for (int i = 0; i < threadCount; i++) {
10       final int threadnum = i;
11       threadPool.execute(() -> {// Lambda 表示式的運用
12         try {
13           test(threadnum);
14         } catch (InterruptedException e) {
15           // TODO Auto-generated catch block
16           e.printStackTrace();
17         } finally {
18           countDownLatch.countDown();// 表示一個請求已經被完成
19         }
20 
21       });
22     }
23     countDownLatch.await();
24     threadPool.shutdown();
25     System.out.println("finish");
26   }
27 
28   public static void test(int threadnum) throws InterruptedException {
29     Thread.sleep(1000);// 模擬請求的耗時操作
30     System.out.println("threadnum:" + threadnum);
31     Thread.sleep(1000);// 模擬請求的耗時操作
32   }
33 }

 

上面的程式碼中,我們定義了請求的數量為 550,當這 550 個請求被處理完成之後,才會執行System.out.println("finish");

與 CountDownLatch 的第一次互動是主執行緒等待其他執行緒。主執行緒必須在啟動其他執行緒後立即呼叫 CountDownLatch.await() 方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。

其他 N 個執行緒必須引用閉鎖物件,因為他們需要通知 CountDownLatch 物件,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每呼叫一次這個方法,在建構函式中初始化的 count 值就減 1。所以當 N 個執行緒都調 用了這個方法,count 的值等於 0,然後主執行緒就能通過 await()方法,恢復執行自己的任務。

 

再插一嘴:CountDownLatch 的 await() 方法使用不當很容易產生死鎖,比如我們上面程式碼中的 for 迴圈改為:

1 for (int i = 0; i < threadCount-1; i++) {
2 .......
3 }

這樣就導致 count 的值沒辦法等於 0,然後就會導致一直等待。

 

CountDownLatch 的不足

CountDownLatch 是一次性的,計數器的值只能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當 CountDownLatch 使用完畢後,它不能再次被使用。

 

CountDownLatch 相常見面試題

  • CountDownLatch 怎麼用?應用場景是什麼?
  • CountDownLatch 和 CyclicBarrier 的不同之處?
  • CountDownLatch 類中主要的方法?

 

 

CyclicBarrier(迴圈柵欄)

CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現執行緒間的技術等待,但是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。

CountDownLatch 的實現是基於 AQS 的,而 CycliBarrier 是基於 ReentrantLock(ReentrantLock 也屬於 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是:讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。

CyclicBarrier 預設的構造方法是 CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫 await() 方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。

再來看一下它的建構函式:

 1 public CyclicBarrier(int parties) {
 2     this(parties, null);
 3 }
 4 
 5 public CyclicBarrier(int parties, Runnable barrierAction) {
 6     if (parties <= 0) throw new IllegalArgumentException();
 7     this.parties = parties;
 8     this.count = parties;
 9     this.barrierCommand = barrierAction;
10 }

其中,parties 就代表了有攔截的執行緒的數量,當攔截的執行緒數量達到這個值的時候就開啟柵欄,讓所有執行緒通過。

 

CyclicBarrier 的應用場景

CyclicBarrier 可以用於多執行緒計算資料,最後合併計算結果的應用場景。比如我們用一個 Excel 儲存了使用者所有銀行流水,每個 Sheet 儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個 sheet 裡的銀行流水,都執行完之後,得到每個 sheet 的日均銀行流水,最後,再用 barrierAction 用這些執行緒的計算結果,計算出整個 Excel 的日均銀行流水。

 

CyclicBarrier 的使用示例

示例 1

 1 public class CyclicBarrierExample2 {
 2   // 請求的數量
 3   private static final int threadCount = 550;
 4   // 需要同步的執行緒數量
 5   private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
 6 
 7   public static void main(String[] args) throws InterruptedException {
 8     // 建立執行緒池
 9     ExecutorService threadPool = Executors.newFixedThreadPool(10);
10 
11     for (int i = 0; i < threadCount; i++) {
12       final int threadNum = i;
13       Thread.sleep(1000);
14       threadPool.execute(() -> {
15         try {
16           test(threadNum);
17         } catch (InterruptedException e) {
18           // TODO Auto-generated catch block
19           e.printStackTrace();
20         } catch (BrokenBarrierException e) {
21           // TODO Auto-generated catch block
22           e.printStackTrace();
23         }
24       });
25     }
26     threadPool.shutdown();
27   }
28 
29   public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
30     System.out.println("threadnum:" + threadnum + "is ready");
31     try {
32       /**等待60秒,保證子執行緒完全執行結束*/
33       cyclicBarrier.await(60, TimeUnit.SECONDS);
34     } catch (Exception e) {
35       System.out.println("-----CyclicBarrierException------");
36     }
37     System.out.println("threadnum:" + threadnum + "is finish");
38   }
39 
40 }

執行結果如下:

 1 threadnum:0is ready
 2 threadnum:1is ready
 3 threadnum:2is ready
 4 threadnum:3is ready
 5 threadnum:4is ready
 6 threadnum:4is finish
 7 threadnum:0is finish
 8 threadnum:1is finish
 9 threadnum:2is finish
10 threadnum:3is finish
11 threadnum:5is ready
12 threadnum:6is ready
13 threadnum:7is ready
14 threadnum:8is ready
15 threadnum:9is ready
16 threadnum:9is finish
17 threadnum:5is finish
18 threadnum:8is finish
19 threadnum:7is finish
20 threadnum:6is finish
21 ......

 

可以看到當執行緒數量也就是請求數量達到我們定義的 5 個的時候, await() 方法之後的方法才被執行。

另外,CyclicBarrier 還提供一個更高階的建構函式 CyclicBarrier(int parties, Runnable barrierAction),用於線上程到達屏障時,優先執行 barrierAction,方便處理更復雜的業務場景。示例程式碼如下

 1 public class CyclicBarrierExample3 {
 2   // 請求的數量
 3   private static final int threadCount = 550;
 4   // 需要同步的執行緒數量
 5   private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
 6     System.out.println("------當執行緒數達到之後,優先執行------");
 7   });
 8 
 9   public static void main(String[] args) throws InterruptedException {
10     // 建立執行緒池
11     ExecutorService threadPool = Executors.newFixedThreadPool(10);
12 
13     for (int i = 0; i < threadCount; i++) {
14       final int threadNum = i;
15       Thread.sleep(1000);
16       threadPool.execute(() -> {
17         try {
18           test(threadNum);
19         } catch (InterruptedException e) {
20           // TODO Auto-generated catch block
21           e.printStackTrace();
22         } catch (BrokenBarrierException e) {
23           // TODO Auto-generated catch block
24           e.printStackTrace();
25         }
26       });
27     }
28     threadPool.shutdown();
29   }
30 
31   public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
32     System.out.println("threadnum:" + threadnum + "is ready");
33     cyclicBarrier.await();
34     System.out.println("threadnum:" + threadnum + "is finish");
35   }
36 
37 }

執行結果如下:

 1 threadnum:0is ready
 2 threadnum:1is ready
 3 threadnum:2is ready
 4 threadnum:3is ready
 5 threadnum:4is ready
 6 ------當執行緒數達到之後,優先執行------
 7 threadnum:4is finish
 8 threadnum:0is finish
 9 threadnum:2is finish
10 threadnum:1is finish
11 threadnum:3is finish
12 threadnum:5is ready
13 threadnum:6is ready
14 threadnum:7is ready
15 threadnum:8is ready
16 threadnum:9is ready
17 ------當執行緒數達到之後,優先執行------
18 threadnum:9is finish
19 threadnum:5is finish
20 threadnum:6is finish
21 threadnum:8is finish
22 threadnum:7is finish
23 ......

 

 

CyclicBarrier 的原始碼分析

 

當呼叫 CyclicBarrier 物件呼叫 await() 方法時,實際上呼叫的是 dowait(false, 0L)方法。 await() 方法就像樹立起一個柵欄的行為一樣,將執行緒擋住了,當攔住的執行緒數量達到 parties 的值時,柵欄才會開啟,執行緒才得以通過執行。

1 public int await() throws InterruptedException, BrokenBarrierException {
2   try {
3         return dowait(false, 0L);
4   } catch (TimeoutException toe) {
5         throw new Error(toe); // cannot happen
6   }
7 }

dowait(false, 0L)

 1 // 當執行緒數量或者請求數量達到 count 時 await 之後的方法才會被執行。上面的示例中 count 的值就為 5。
 2     private int count;
 3     /**
 4      * Main barrier code, covering the various policies.
 5      */
 6     private int dowait(boolean timed, long nanos)
 7         throws InterruptedException, BrokenBarrierException,
 8                TimeoutException {
 9         final ReentrantLock lock = this.lock;
10         // 鎖住
11         lock.lock();
12         try {
13             final Generation g = generation;
14 
15             if (g.broken)
16                 throw new BrokenBarrierException();
17 
18             // 如果執行緒中斷了,丟擲異常
19             if (Thread.interrupted()) {
20                 breakBarrier();
21                 throw new InterruptedException();
22             }
23             // cout減1
24             int index = --count;
25             // 當 count 數量減為 0 之後說明最後一個執行緒已經到達柵欄了,也就是達到了可以執行await 方法之後的條件
26             if (index == 0) {  // tripped
27                 boolean ranAction = false;
28                 try {
29                     final Runnable command = barrierCommand;
30                     if (command != null)
31                         command.run();
32                     ranAction = true;
33                     // 將 count 重置為 parties 屬性的初始化值
34                     // 喚醒之前等待的執行緒
35                     // 下一波執行開始
36                     nextGeneration();
37                     return 0;
38                 } finally {
39                     if (!ranAction)
40                         breakBarrier();
41                 }
42             }
43 
44             // loop until tripped, broken, interrupted, or timed out
45             for (;;) {
46                 try {
47                     if (!timed)
48                         trip.await();
49                     else if (nanos > 0L)
50                         nanos = trip.awaitNanos(nanos);
51                 } catch (InterruptedException ie) {
52                     if (g == generation && ! g.broken) {
53                         breakBarrier();
54                         throw ie;
55                     } else {
56                         // We're about to finish waiting even if we had not
57                         // been interrupted, so this interrupt is deemed to
58                         // "belong" to subsequent execution.
59                         Thread.currentThread().interrupt();
60                     }
61                 }
62 
63                 if (g.broken)
64                     throw new BrokenBarrierException();
65 
66                 if (g != generation)
67                     return index;
68 
69                 if (timed && nanos <= 0L) {
70                     breakBarrier();
71                     throw new TimeoutException();
72                 }
73             }
74         } finally {
75             lock.unlock();
76         }
77     }

 

總結:CyclicBarrier 內部通過一個 count 變數作為計數器,count 的初始值為 parties 屬性的初始化值,每當一個執行緒到了柵欄這裡了,那麼就將計數器減一。如果 count 值為 0 了,表示這是這一代最後一個執行緒到達柵欄,就嘗試執行我們構造方法中輸入的任務。

 

CyclicBarrier 和 CountDownLatch 的區別

下面這個是國外一個大佬的回答:

CountDownLatch 是計數器,只能使用一次,而 CyclicBarrier 的計數器提供 reset 功能,可以多次使用。但是我不那麼認為它們之間的區別僅僅就是這麼簡單的一點。我們來從 jdk 作者設計的目的來看,javadoc 是這麼描述它們的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一個或者多個執行緒,等待其他多個執行緒完成某件事情之後才能執行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多個執行緒互相等待,直到到達同一個同步點,再繼續一起執行。)

對於 CountDownLatch 來說,重點是“一個執行緒(多個執行緒)等待”,而其他的 N 個執行緒在完成“某件事情”之後,可以終止,也可以等待。而對於 CyclicBarrier,重點是多個執行緒,在任意一個執行緒沒有完成,所有的執行緒都必須等待。

CountDownLatch 是計數器,執行緒完成一個記錄一個,只不過計數不是遞增而是遞減,而 CyclicBarrier 更像是一個閥門,需要所有執行緒都到達,閥門才能開啟,然後繼續執行。