併發程式設計 — CountDownLatch 詳解
阿新 • • 發佈:2020-12-15
技術標籤:併發程式設計CountDownLatch併發程式設計多程序
一、概述
類 CountDownLatch 是一個同步功能的輔助類,使用效果是給定一個計數,當使用這個CountDownLatch類的執行緒判斷計數不為0時,則呈wait狀態,如果為0時則繼續執行。實現等待與繼續執行的效果分別需要使用await()和countDown()方法來進行。呼叫await()方法時判斷計數是否為0,如果不為0則呈等待狀態。其他執行緒可以呼叫countDown()方法將計數減1,當計數減到為0時,呈等待的執行緒繼續執行。而方法getCount()就是獲得當前的計數個數。
二、使用場景
1、某一執行緒在開始執行前等待n個執行緒執行完畢
比如實現一個機票比價場景,需要呼叫各個航空公司的機票價格,然後進行價格排序,就可以通過 CountDownLatch 來實現,程式碼如下所示:
public class CountDownLatchExample1 { public static void main(String[] args) { List<String> airs = Arrays.asList("東方航空", "南方航空", "成都航空", "北京航空"); //定義一個CountDownLatch物件,指定數量為航空公司的個數 CountDownLatch latch = new CountDownLatch(airs.size()); List<Pair<String, Integer>> list = new ArrayList<>(); //定義四個執行緒模擬呼叫 航空公司的外部介面 Thread[] threads = new Thread[airs.size()]; for(int i = 0; i < airs.size(); i++){ threads[i] = new Thread(() -> { try { // 模擬呼叫耗時 TimeUnit.SECONDS.sleep(current().nextInt(10)); //構建價格 Pair<String, Integer> pair = new Pair<>(Thread.currentThread().getName(), current().nextInt(100)); list.add(pair); System.out.println(pair.toString()); } catch (InterruptedException e) { e.printStackTrace(); }finally { latch.countDown(); //執行完後 是計數器減1 } }, airs.get(i)); } //啟動所有的執行緒 Stream.of(threads).forEach(Thread::start); System.out.println("等待所有的執行緒執行完"); try { latch.await(); // 在此等待 } catch (InterruptedException e) { e.printStackTrace(); } list.sort((o1, o2) -> { if(o1.getValue() > o2.getValue()){ return 1; }else if (o1.getValue().equals(o2.getValue())){ return 0; }else { return -1; } }); System.out.println("========最終結果======="); list.forEach(System.out::println); } }
執行結果:
2、實現多個執行緒開始執行任務的最大並行性
public class CountDownLatchExample2 { //裁判類 static class Referee{ private CountDownLatch downLatch = new CountDownLatch(1); private CountDownLatch latch; Referee(int count){ latch = new CountDownLatch(count); } // 各就位 void prepare() { try { System.out.printf("執行緒[%s]準備就緒 \n", Thread.currentThread().getName()); latch.countDown(); //運動員就位 downLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("執行緒[%s]結束 \n", Thread.currentThread().getName()); } //開始執行 public void start(){ try { latch.await(); //等待運動員就位 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("開始"); downLatch.countDown(); } } // 運動員類 static class SportsMan extends Thread{ private Referee referee; SportsMan(Referee referee) { this.referee = referee; } @Override public void run() { referee.prepare(); // 運動員準備 } } public static void main(String[] args) { //定義裁判指定10個運動員 Referee referee = new Referee(10); //定義 10 個運動員 SportsMan[] mans = new SportsMan[10]; for (int i = 0; i < 10; i++){ mans[i] = new SportsMan(referee); mans[i].setName("Thread-" + i); } Stream.of(mans).forEach(Thread::start); referee.start(); } }
執行結果:
三、原始碼解析
1、類結構
2、Sync 一個靜態內部類
//構造方法 CountDownLatch 的構造方法最終呼叫的是 Sync的構造。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count); //初始化計數器
}
// 獲取當前計數器
int getCount() {
return getState();
}
// 試圖在共享模式下獲取物件狀態
// 申請資源 如果 數量為 0,則不進行阻塞, 否則進入阻塞
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 試圖設定狀態來反映共享模式下的一個釋放
// 減少計數器數量,直到數量為0,
protected boolean tryReleaseShared(int releases) {
// 死迴圈保證最終這個狀態值能設定成功
for (;;) {
// 獲取當前狀態值,CountDownLatch中的鎖存器計數
int c = getState();
// 如果狀態為0,表示CountDownLatch中的鎖存器計數為0,就直接返回
if (c == 0)
return false;
// 如果狀態不為0,則將狀態減一
int nextc = c-1;
/*
* 更新state值,即鎖存器計數值,通過CAS保證執行緒安全
* CAS操作有3個運算元,記憶體值M,預期值E,新值U,如果M==E,則將記憶體值修改為B,否則啥都不做
* compareAndSetState(c, nextc)方法:
* c:表示預期值
* nextc:要更新的值
* 在此處,表示c==getState()時返回true,並更新state值為nextc
* 如果c!=getState()時,表示已經有其他執行緒更新了state值,
* 所以這裡不進行更新,直接返回false,通過死迴圈再重新獲取最新的getState()值
*/
if (compareAndSetState(c, nextc))
// 返回狀態是否為0判斷,為0表示鎖存器計數為0,可以喚醒await的程序了
// 這裡喚醒程序的操作也是通過AQS進行實現的
return nextc == 0;
}
}
}
從原始碼可知 CountDownLatch 內部是通過 AQS 的共享模式實現的
3、await() 和 await(long timeout, TimeUnit unit) 方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
此函式將會使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
此函式將會使當前執行緒在鎖存器倒計數至零之前一直等待,除非超時或者被中斷。
4、countDown() 方法
此函式將遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒
/**
* count值減 1,直到計數達到零,釋放所有等待的執行緒 。
*
* <p>如果當前計數大於零,則遞減。
* 如果新計數為零,則重新啟用所有等待的執行緒 ,達到執行緒排程的目的。
*
* <p>如果當前計數等於零,則沒有任何反應。
*/
public void countDown() {
sync.releaseShared(1);
}
注意:
- 初始化CountDownLatch 時的計數器必須大於0,只有當計數器等於 0 的時候,呼叫 await() 方法是不會阻塞。
- 任務的結束並不一定代表著正常的結束,有可能是在運算的過程中出現錯誤,因此為了能夠正確地執行countDown(),需要將該方法的呼叫放在finally程式碼塊中,否則就會出現主執行緒(任務)await()方法永遠不會退出阻塞的問題。