1. 程式人生 > >原始碼分析:CountDownLatch 之倒計時門栓

原始碼分析:CountDownLatch 之倒計時門栓

## 簡介 CountDownLatch 是JDK1.5 開始提供的一種同步輔助工具,它允許一個或多個執行緒一直等待,直到其他執行緒執行的操作完成為止。在初始化的時候給定 CountDownLatch 一個計數,呼叫await() 方法的執行緒會一直等待,其他執行緒執行完操作後呼叫countDown(),當計數減到0 ,呼叫await() 方法的執行緒被喚醒繼續執行。 ### 應用場景 1. 多執行緒併發下載或上傳 主執行緒初始化一個為5的CountDownLatch ,然後分發給5個執行緒去完成下載或上傳的動作,主執行緒等待其他執行緒完成任務後返回成功呢。 2. 首頁,一個複雜的查詢包含多個子查詢,但是子查詢結果互相不依賴,也可以使用 CountDownLatch ,等待多個查詢完成後再一起返回給首頁。 ## 原始碼分析 CountDownLatch 的原始碼相對於之前介紹的幾個同步類,程式碼量要少很多很多,在JDK 1.8版本中也就300多行(包含註釋),所以分析起來也比較簡單。 ### 內部類Sync 同樣的,該內部類也繼承了AQS,程式碼展示: ```java private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 同步器的構造方法,初始化計數 setState(count); } ... } ``` ### 主要的屬性 主要的屬性就一個,也就是內部類例項:同步器Sync ```java private final Sync sync; ``` ### 構造方法 CountDownLatch 就一個構造方法,必須制定初始化計數 ```java public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); // 初始化同步器,指定計數 } ``` CountDownLatch 不算構造方法和toString方法一共也才4個方法,不多,所以我們全部看一下 ### await() 方法 呼叫該方法的執行緒會被阻塞,指定初始化的計數被減為0,或者執行緒被中斷丟擲異常。 程式碼展示: ```java // CountDownLatch.await() public void await() throws InterruptedException { // 會丟擲中斷異常 sync.acquireSharedInterruptibly(1); //呼叫的是同步器框架AQS的方法 } // AQS框架程式碼 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 檢查執行緒中斷狀態,丟擲異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 套路一樣,呼叫Sync裡面的方法 doAcquireSharedInterruptibly(arg); // 阻塞執行緒,排隊,等待被喚醒 } // 內部類Sync.tryAcquireShared() protected int tryAcquireShared(int acquires) { // 檢查計數,如果為0,返回1,如果不為0,返回-1; return (getState() == 0) ? 1 : -1; } ``` **await() 方法總結:** 1. 這應該是最簡單的一個tryAcquireShared方法實現了。 2. 僅呼叫了getState來檢查當前計數,如果計數為0,返回1;如果計數不為0,返回-1。 3. 阻塞執行緒,排隊,等待被喚醒,中斷丟擲異常等邏輯都是在AQS實現的,具體分析請看之前的AQS分析文章 ### boolean await(timeout, unit)方法 和無引數的await()方法唯一的區別就是該方法指定了等待超時的時間,並且有返回值; 如果計數為0,則返回true; 如果執行緒被中斷,則丟擲異常; 如果執行緒經過了指定的等待時間,則返回false; 程式碼展示: ```java public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) // 檢查執行緒中斷狀態 throw new InterruptedException(); // tryAcquireShared 只會返回1或者-1,返回1代表計數已經為0,直接返回true // doAcquireSharedNanos 是AQS 框架裡面的程式碼 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } // AQS 框架裡面的程式碼 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 計算超時時間 final long deadline = System.nanoTime() + nanosTimeout; // 構建當前排隊節點,並加入佇列,精靈王之前有分析 final Node node = addWaiter(Node.SHARED); //共享節點 boolean failed = true; try { for (;;) { // 自旋 tryAcquireShared(arg) final Node p = node.predecessor(); if (p == head) { // 輪到當前節點了 int r = tryAcquireShared(arg); if (r >= 0) { // 這裡返回的大於等於0,說明計數為0,返回true setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; // 超時了,直接返回false if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout >
spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 阻塞當前執行緒 if (Thread.interrupted()) // 中斷丟擲異常 throw new InterruptedException(); } } finally { if (failed) // 節點被取消 cancelAcquire(node); } } ``` ### countDown() 方法 如果當前計數大於零,則將其遞減,如果計數達到零,則喚醒所有等待的執行緒(呼叫了await方法的執行緒)。如果當前計數等於零,那麼什麼也不會發生。原始碼展示: ```java public void countDown() { sync.releaseShared(1); // 呼叫AQS遞減計數 } // AQS同步框架的程式碼 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 呼叫自己實現的方法tryReleaseShared doReleaseShared(); //計數為0,喚醒所有等待的執行緒,返回true return true; } return false; } // CDL 自己實現的遞減計數方法 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 自旋,保證遞減操作成功 int c = getState(); // 當前的技術 if (c == 0) // 計數已經是0了,返回false,之後啥也不會發生 return false; int nextc = c-1; // 遞減 if (compareAndSetState(c, nextc)) // cas 更新計數 return nextc == 0; 計數為0才返回true } } // 喚醒等待的執行緒 private void doReleaseShared() { for (;;) { //自旋操作 Node h = head; if (h != null && h != tail) { // 等待的執行緒佇列不為空 int ws = h.waitStatus; if (ws == Node.SIGNAL) {// 檢查狀態是否要喚醒下一個節點的執行緒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // CAS 失敗了才會繼續continue continue; // loop to recheck cases unparkSuccessor(h); // 喚醒頭節點的下一個節點執行緒 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 頭節點沒變 if (h == head) // loop if head changed break; } } ``` **countDown() 方法總結:** 1. 主要邏輯就是把計數減1 2. 如果計數減到了0,則喚醒所有佇列中等待的執行緒 3. 如果減之前計數已經是0了,則什麼也不幹 ### getCount() 方法 ```java public long getCount() { // CDL 的API return sync.getCount(); } // 內部類 Sync int getCount() { return getState(); } // AQS 框架api protected final int getState() { return state; } ``` 返回當前的計數。 ## CountDownLatch 總結 1. 主要功能維護計數,當計數減為零後才放開所有等待的執行緒 2. CountDownLatch 沒有加計數的API,所以一個CountDownLatch不可以重複使用,如果要用可以重置計數的,可以使用CyclicBarrier。 3. CountDownLatch 也會有“死鎖”的現象,要避免計數永遠減不到0的情況 4. 如果初始化計數為0,那麼 CountDownLatch 則毫無作用,不如不用 5. 如果初始化計數為1,呼叫await時阻塞自己,別人countDown解鎖後,再喚醒自己(類似於在等一個資源,拿到資源在繼續進行) ### 和Semaphore的區別 Semaphore 可以用來限流,比如限制一個景區最多允許10000人同時在園內,只有當有人出園後,才允許其他人入園。 CountDownLatch 可以用來計數,比如導遊在出發點等待10名遊客一起出發,來一名遊客就畫個叉,直到10名遊客到齊後,才一起出發去