一篇關於CountDownLatch的好文章
CountDownLatch簡介
CountDownLatch是一種java.util.concurrent包下一個同步工具類,它允許一個或多個線程等待直到在其他線程操作執行完成。
使用場景:
在開發過程中,經常會遇到需要在主線程中開啟多條線程去並行執行任務,並且主線程需要等待所有子線程執行完畢後再進行匯總的場景,
CountDownLatch的內部提供了一個計數器,在構造閉鎖時必須指定計數器的初始值,且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操作計數器的值,每調用一次countDown方法計數器都會減1,直到計數器的值減為0,
它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。
CountDownLatch原理
CountDownLatch底層依靠的是AQS,通過構造函數初始化計數器時,實際上是
把計數器的值賦值給了AQS的state,也就是這裏AQS的狀態值來表示計數器值。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
await方法
void await()方法,當前線程調用了CountDownLatch對象的await方法後,當前線程會被阻塞,直到出現下面情況之一時才會返回:
- 當所有線程都調用了CountDownLatch對象的countDown方法後,也就是說計時器值為 0 的時候。
- 其他線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常後返回。接下來讓我們看看await()方法內部是如何調用
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //AQS的獲取共享資源時候可被中斷的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException { //如果線程被中斷則拋異常 if (Thread.interrupted()) throw new InterruptedException(); //嘗試看當前是否計數值為0,為0則直接返回,否者進入AQS的隊列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //sync類實現的AQS的接口 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
從上面代碼可以看到await()方法委托sync調用了AQS的acquireSharedInterruptibly方法,該方法的特點是線程獲取資源的時候可以被中斷,並且獲取到的資源是共享資源,這裏為什麽要調用AQS的這個方法,而不是調用獨占鎖的accquireInterruptibly方法呢?這是因為這裏狀態值需要的並不是非 0 即 1 的效果,而是和初始化時候指定的計數器值有關系,比如你初始化的時候計數器值為 8 ,那麽state的值應該就有 0 到 8 的狀態,而不是只有 0 和 1 的獨占效果。
這裏await()方法調用acquireSharedInterruptibly的時候傳遞的是 1 ,就是說明要獲取一個資源,而這裏計數器值是資源總數,也就是意味著是讓總的資源數減 1 ,acquireSharedInterruptibly內部首先判斷如果當前線程被中斷了則拋出異常,否則調用sync實現的tryAcquireShared方法看當前狀態值(計數器值)是否為 0 ,是則當前線程的await()方法直接返回,否則調用AQS的doAcquireSharedInterruptibly讓當前線程阻塞。另外調用tryAcquireShared的方法僅僅是檢查當前狀態值是不是為 0 ,並沒有調用CAS讓當前狀態值減去 1 。
boolean await(long timeout, TimeUnit unit)
當線程調用了 CountDownLatch 對象的該方法後,當前線程會被阻塞,直到出現下面情況之一時才會返回:
- 當所有線程都調用了 CountDownLatch 對象的 countDown 方法後,也就是計時器值為 0 的時候,這時候返回 true.
- 設置的 timeout 時間到了,因為超時而返回 false.
- 其它線程調用了當前線程的 interrupt()方法中斷了當前線程,當前線程會拋出 InterruptedException 異常後返回。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
countDown方法
void countDown() 當前線程調用了該方法後,會遞減計數器的值,遞減後如果計數器為 0 則會喚醒所有調用await 方法而被阻塞的線程,否則什麽都不做。
public void countDown() {
//委托sync調用AQS的方法
sync.releaseShared(1);
}
//AQS的方法
public final boolean releaseShared(int arg) {
//調用sync實現的tryReleaseShared
if (tryReleaseShared(arg)) {
//AQS的釋放資源方法
doReleaseShared();
return true;
}
return false;
}
如上面代碼可以知道CountDownLatch的countDown()方法是委托sync調用了AQS的releaseShared方法,後者調用了sync 實現的AQS的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
//循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一並更新到state
for (;;) {
int c = getState();
//如果當前狀態值為0則直接返回(1)
if (c == 0)
return false;
//CAS設置計數值減一(2)
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如上代碼可以看到首先獲取當前狀態值(計數器值),代碼(1)如果當前狀態值為 0 則直接返回 false ,則countDown()方法直接返回;否則執行代碼(2)使用CAS設置計數器減一,CAS失敗則循環重試,否則如果當前計數器為 0 則返回 true 。返回 true 後,說明當前線程是最後一個調用countDown()方法的線程,那麽該線程除了讓計數器減一外,還需要喚醒調用CountDownLatch的await 方法而被阻塞的線程。這裏的代碼(1)貌似是多余的,其實不然,之所以添加代碼 (1) 是為了防止計數器值為 0 後,其他線程又調用了countDown方法,如果沒有代碼(1),狀態值就會變成負數。
getCount()方法
long getCount() 獲取當前計數器的值,也就是 AQS 的 state 的值。
public long getCount() {
return sync.getCount();
}
int getCount() {
return getState();
}
如上代碼可知內部還是調用了 AQS 的 getState 方法來獲取 state 的值(計數器當前值)。
使用方法(案例)
public class CountDownLatchTest {
private static AtomicInteger id = new AtomicInteger();
// 創建一個CountDownLatch實例,管理計數為ThreadNum
private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
Thread threadOne = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
countDownLatch.countDown();
}
});
Thread threadTwo = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
countDownLatch.countDown();
}
});
Thread threadThree = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
countDownLatch.countDown();
}
});
// 啟動子線程
threadOne.start();
threadTwo.start();
threadThree.start();
System.out.println("等待鬥地主玩家進場");
// 等待子線程執行完畢,返回
countDownLatch.await();
System.out.println("鬥地主玩家已經滿人,開始發牌.....");
}
}
OutPut:
等待鬥地主玩家進場
【玩家0】已入場
【玩家1】已入場
【玩家2】已入場
鬥地主玩家已經滿人,開始發牌.....
如上代碼,創建了一個 CountDownLatch 實例,因為有三個子線程所以構造函數參數傳遞為 3,主線程調用 countDownLatch.await()方法後會被阻塞。子線程執行完畢後調用countDownLatch.countDown() 方法讓 countDownLatch 內部的計數器減一,等所有子線程執行完畢調用 countDown()後計數器會變為 0,這時候主線程的 await()才會返回。
CountDownLatch 與 join 方法的區別,一個區別是調用一個子線程的 join()方法後,該線程會一直被阻塞直到該線程運行完畢,而 CountDownLatch 則使用計數器允許子線程運行完畢或者運行中時候遞減計數,也就是 CountDownLatch 可以在子線程運行任何時候讓 await 方法返回而不一定必須等到線程結束;另外使用線程池來管理線程時候一般都是直接添加 Runable 到線程池這時候就沒有辦法在調用線程的 join 方法了,countDownLatch 相比 Join 方法讓我們對線程同步有更靈活的控制。
轉自: https://www.omgleoo.top/%E4%B8%80%E7%AF%87%E5%85%B3%E4%BA%8Ecountdownlatch%E7%9A%84%E5%A5%BD%E6%96%87%E7%AB%A0/
一篇關於CountDownLatch的好文章