【JUC】JDK1.8原始碼分析之CountDownLatch
轉自 https://www.cnblogs.com/leesf456/p/5406191.html
一、前言
分析完了CyclicBarrier後,下面分析CountDownLatch,CountDownLatch用於同步一個或多個任務,強制他們等待由其他任務執行的一組操作完成。CountDownLatch典型的用法是將一個程式分為n個互相獨立的可解決任務,並建立值為n的CountDownLatch。當每一個任務完成時,都會在這個鎖存器上呼叫countDown,等待問題被解決的任務呼叫這個鎖存器的await,將他們自己攔住,直至鎖存器計數結束。下面開始分析原始碼。
二、CountDownLatch資料結構
從原始碼可知,其底層是由AQS提供支援,所以其資料結構可以參考AQS的資料結構,而AQS的資料結構核心就是兩個虛擬佇列:同步佇列sync queue 和條件佇列condition queue,不同的條件會有不同的條件佇列。讀者可以參考之前介紹的AQS。
三、CountDownLatch原始碼分析
3.1 類的繼承關係
public class CountDownLatch {}
說明:可以看到CountDownLatch沒有顯示繼承哪個父類或者實現哪個父介面,根據Java語言規定,可知其父類是Object。
3.2 類的內部類
CountDownLatch類存在一個內部類Sync,繼承自AbstractQueuedSynchronizer,其原始碼如下。
private static final class Sync extends AbstractQueuedSynchronizer { // 版本號 private static final long serialVersionUID = 4982264981922014374L; // 構造器 Sync(int count) { setState(count); } // 返回當前計數 int getCount() { return getState(); } // 試圖在共享模式下獲取物件狀態 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 試圖設定狀態來反映共享模式下的一個釋放 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 無限迴圈 for (;;) { // 獲取狀態 int c = getState(); if (c == 0) // 沒有被執行緒佔有 return false; // 下一個狀態 int nextc = c-1; if (compareAndSetState(c, nextc)) // 比較並且設定成功 return nextc == 0; } } }
說明:對CountDownLatch方法的呼叫會轉發到對Sync或AQS的方法的呼叫,所以,AQS對CountDownLatch提供支援。
3.3 類的屬性
public class CountDownLatch {
// 同步佇列
private final Sync sync;
}
說明:可以看到CountDownLatch類的內部只有一個Sync型別的屬性,這個屬性相當重要,後面會進行分析。
3.4 類的建構函式
1. CountDownLatch(int) 型建構函式
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化狀態數
this.sync = new Sync(count);
}
說明:該建構函式可以構造一個用給定計數初始化的CountDownLatch,並且建構函式內完成了sync的初始化,並設定了狀態數。
3.5 核心函式分析
1. await函式
此函式將會使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷。其原始碼如下
public void await() throws InterruptedException {
// 轉發到sync物件上
sync.acquireSharedInterruptibly(1);
}
說明:由原始碼可知,對CountDownLatch物件的await的呼叫會轉發為對Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的呼叫,acquireSharedInterruptibly原始碼如下
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
說明:從原始碼中可知,acquireSharedInterruptibly又呼叫了CountDownLatch的內部類Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函式。tryAcquireShared函式的原始碼如下
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
說明:該函式只是簡單的判斷AQS的state是否為0,為0則返回1,不為0則返回-1。doAcquireSharedInterruptibly函式的原始碼如下
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 新增節點至等待佇列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { // 無限迴圈
// 獲取node的前驅節點
final Node p = node.predecessor();
if (p == head) { // 前驅節點為頭結點
// 試圖在共享模式下獲取物件狀態
int r = tryAcquireShared(arg);
if (r >= 0) { // 獲取成功
// 設定頭結點並進行繁殖
setHeadAndPropagate(node, r);
// 設定節點next域
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 在獲取失敗後是否需要禁止執行緒並且進行中斷檢查
// 丟擲異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
說明:在AQS的doAcquireSharedInterruptibly中可能會再次呼叫CountDownLatch的內部類Sync的tryAcquireShared方法和AQS的setHeadAndPropagate方法。setHeadAndPropagate方法原始碼如下。
private void setHeadAndPropagate(Node node, int propagate) {
// 獲取頭結點
Node h = head; // Record old head for check below
// 設定頭結點
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 進行判斷
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 獲取節點的後繼
Node s = node.next;
if (s == null || s.isShared()) // 後繼為空或者為共享模式
// 以共享模式進行釋放
doReleaseShared();
}
}
說明:該方法設定頭結點並且釋放頭結點後面的滿足條件的結點,該方法中可能會呼叫到AQS的doReleaseShared方法,其原始碼如下。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 無限迴圈
for (;;) {
// 儲存頭結點
Node h = head;
if (h != null && h != tail) { // 頭結點不為空並且頭結點不為尾結點
// 獲取頭結點的等待狀態
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 狀態為SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就繼續
continue; // loop to recheck cases
// 釋放後繼結點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 狀態為0並且不成功,繼續
continue; // loop on failed CAS
}
if (h == head) // 若頭結點改變,繼續迴圈
break;
}
}
說明:該方法在共享模式下釋放,具體的流程再之後會通過一個示例給出。
所以,對CountDownLatch的await呼叫大致會有如下的呼叫鏈。
說明:上圖給出了可能會呼叫到的主要方法,並非一定會呼叫到,之後,會通過一個示例給出詳細的分析。
2. countDown函式
此函式將遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒
public void countDown() {
sync.releaseShared(1);
}
說明:對countDown的呼叫轉換為對Sync物件的releaseShared(從AQS繼承而來)方法的呼叫。releaseShared原始碼如下
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
說明:此函式會以共享模式釋放物件,並且在函式中會呼叫到CountDownLatch的tryReleaseShared函式,並且可能會呼叫AQS的doReleaseShared函式,其中,tryReleaseShared原始碼如下
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 無限迴圈
for (;;) {
// 獲取狀態
int c = getState();
if (c == 0) // 沒有被執行緒佔有
return false;
// 下一個狀態
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 比較並且設定成功
return nextc == 0;
}
}
說明:此函式會試圖設定狀態來反映共享模式下的一個釋放。具體的流程在下面的示例中會進行分析。AQS的doReleaseShared的原始碼如下
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 無限迴圈
for (;;) {
// 儲存頭結點
Node h = head;
if (h != null && h != tail) { // 頭結點不為空並且頭結點不為尾結點
// 獲取頭結點的等待狀態
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 狀態為SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就繼續
continue; // loop to recheck cases
// 釋放後繼結點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 狀態為0並且不成功,繼續
continue; // loop on failed CAS
}
if (h == head) // 若頭結點改變,繼續迴圈
break;
}
}
說明:此函式在共享模式下釋放資源。
所以,對CountDownLatch的countDown呼叫大致會有如下的呼叫鏈。
說明:上圖給出了可能會呼叫到的主要方法,並非一定會呼叫到,之後,會通過一個示例給出詳細的分析。
四、示例
下面給出了一個使用CountDownLatch的示例。
package com.hust.grid.leesf.cyclicbarrier;
import java.util.concurrent.CountDownLatch;
class MyThread extends Thread {
private CountDownLatch countDownLatch;
public MyThread(String name, CountDownLatch countDownLatch) {
super(name);
this.countDownLatch = countDownLatch;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " doing something");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finish");
countDownLatch.countDown();
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
MyThread t1 = new MyThread("t1", countDownLatch);
MyThread t2 = new MyThread("t2", countDownLatch);
t1.start();
t2.start();
System.out.println("Waiting for t1 thread and t2 thread to finish");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " continue");
}
}
執行結果(某一次):
Waiting for t1 thread and t2 thread to finish
t1 doing something
t2 doing something
t1 finish
t2 finish
main continue
說明:本程式首先計數器初始化為2。根據結果,可能會存在如下的一種時序圖。
說明:首先main執行緒會呼叫await操作,此時main執行緒會被阻塞,等待被喚醒,之後t1執行緒執行了countDown操作,最後,t2執行緒執行了countDown操作,此時main執行緒就被喚醒了,可以繼續執行。下面,進行詳細分析。
① main執行緒執行countDownLatch.await操作,主要呼叫的函式如下。
說明:在最後,main執行緒就被park了,即禁止運行了。此時Sync queue(同步佇列)中有兩個節點,AQS的state為2,包含main執行緒的結點的nextWaiter指向SHARED結點。
② t1執行緒執行countDownLatch.countDown操作,主要呼叫的函式如下。
說明:此時,Sync queue佇列裡的結點個數未發生變化,但是此時,AQS的state已經變為1了。
③ t2執行緒執行countDownLatch.countDown操作,主要呼叫的函式如下。
說明:經過呼叫後,AQS的state為0,並且此時,main執行緒會被unpark,可以繼續執行。當main執行緒獲取cpu資源後,繼續執行。
④ main執行緒獲取cpu資源,繼續執行,由於main執行緒是在parkAndCheckInterrupt函式中被禁止的,所以此時,繼續在parkAndCheckInterrupt函式執行。
說明:main執行緒恢復,繼續在parkAndCheckInterrupt函式中執行,之後又會回到最終達到的狀態為AQS的state為0,並且head與tail指向同一個結點,該節點的額nextWaiter域還是指向SHARED結點。
五、總結
經過分析CountDownLatch的原始碼可知,其底層結構仍然是AQS,對其執行緒所封裝的結點是採用共享模式,而ReentrantLock是採用獨佔模式。由於採用的共享模式,所以會導致後面的操作會有所差異,通過閱讀原始碼就會很容易掌握CountDownLatch實現機制。
作者:leesf 掌控之中,才會成功;掌控之外,註定失敗。
出處:http://www.cnblogs.com/leesf456/
本文版權歸作者和部落格園共有