【JDK】JDK原始碼分析-CountDownLatch
概述
CountDownLatch 是併發包中的一個工具類,它的典型應用場景為:一個執行緒等待幾個執行緒執行,待這幾個執行緒結束後,該執行緒再繼續執行。
簡單起見,可以把它理解為一個倒數的計數器:初始值為執行緒數,每個執行緒結束時執行減 1 操作,當計數器減到 0 時等待的執行緒再繼續執行。
程式碼分析
CountDownLatch 的類簽名和主要方法如下:
public class CountDownLatch {}
常用方法為:await()、await(long, TimeUnit) 和 countDown。其中兩個 await 都是讓當前執行緒進入等待狀態(獲取資源失敗);而 countDown 方法是將計數器減去 1,當計數器為 0 的時候,那些處於等待狀態的執行緒會繼續執行(獲取資源成功)。
構造器程式碼如下:
private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
構造器(該構造器是唯一的)傳入一個正整數,且初始化了 sync 變數,Sync 是內部的一個巢狀類,繼承自 AQS。
await / await(long, TimeUnit):
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
countDown:
public void countDown() { sync.releaseShared(1); }
其中,acquireSharedInterruptibly、tryAcquireSharedNanos 和 releaseShared 都是 AQS 中「共享模式」的方法,具體程式碼可參考前文「JDK原始碼分析-AbstractQueuedSynchronizer(3)」的分析。
巢狀類 Sync 程式碼如下:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 構造器,初始化 AQS 的 state 變數 Sync(int count) { setState(count); } int getCount() { return getState(); } // 嘗試獲取資源的操作 // 只有當 state 變數為 0 的時候才能獲取成功(返回 1) protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 嘗試釋放資源的操作 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 該操作就是嘗試把 state 變數減去 1 int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
Sync 繼承了 AQS 抽象類,根據 AQS 可知,acquireSharedInterruptibly 和 tryAcquireSharedNanos 方法的實現都呼叫了 tryAcquireShared。
流程說明:通常先把 CountDownLatch 的計數器(state)初始化為 N,執行 wait 操作就是嘗試以共享模式獲取資源,而每次 countDown 操作就是將 N 減去 1,只有當 N 減到 0 的時候,才能獲取成功(tryAcquireShared 方法),然後繼續執行。
場景舉例
為便於理解該類的用法,舉兩個簡單的例子來說明它的使用場景。
場景 1:一個執行緒等待多個執行緒執行完之後再繼續執行
public void test() throws InterruptedException { int count = 5; // CountDownLatch 的初始化計數器為 5 // 注意執行緒數和計數器保持一致 CountDownLatch countDownLatch = new CountDownLatch(count); for (int i = 0; i < count; i++) { int finalI = i; new Thread(() -> { try { TimeUnit.SECONDS.sleep(finalI); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " is working .."); // 每個執行緒執行結束時執行 countDown countDownLatch.countDown(); }).start(); } // 主執行緒進入等待狀態(嘗試獲取資源,成功後才能繼續執行) countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " go on .."); } /* 輸出結果: Thread-0 is working .. Thread-1 is working .. Thread-2 is working .. Thread-3 is working .. Thread-4 is working .. main go on .. */
場景 2:一個執行緒到達指定條件後,通知另一個執行緒
private static volatile List<Integer> list = new ArrayList<>(); private static void test() { CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() -> { if (list.size() != 5) { try { // list 的大小為 5 時再繼續執行,否則等待 // 等待 state 減到 0 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " start.."); }).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { list.add(i); System.out.println(Thread.currentThread().getName() + " add " + i); if (list.size() == 5) { // 滿足條件時將 state 減 1 countDownLatch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } /* 輸出結果: Thread-1 add 0 Thread-1 add 1 Thread-1 add 2 Thread-1 add 3 Thread-1 add 4 Thread-0 start.. Thread-1 add 5 Thread-1 add 6 Thread-1 add 7 Thread-1 add 8 Thread-1 add 9 */
小結
CountDownLatch 可以理解為一個倒數的計數器,它的典型應用場景就是一個執行緒等待幾個執行緒執行結束後再繼續執行。其內部是基於 AQS 的共享模式實現的。
相關閱讀:
JDK原始碼分析-AbstractQueuedSynchronizer(3)
Stay hungry, stay foolish.
PS: 本文首發於微信公眾號【WriteOnRead