高併發第十單:J.U.C AQS 元件:CountDownLatch. CyclicBarrier .Semaphore
AQS全名:AbstractQueuedSynchronizer,是併發容器J.U.C(java.lang.concurrent)下locks包內的一個類。它實現了一個FIFO(FirstIn、FisrtOut先進先出)的佇列。底層實現的資料結構是一個雙向列表。
Sync queue:同步佇列,是一個雙向列表。包括head節點和tail節點。head節點主要用作後續的排程。
Condition queue:非必須,單向列表。當程式中存在cindition的時候才會存在此列表。
AQS全名:AbstractQueuedSynchronizer,是併發容器J.U.C(java.lang.concurrent)下locks包內的一個類。它實現了一個FIFO(FirstIn、FisrtOut先進先出)的佇列。底層實現的資料結構是一個雙向列表。
它維護了一個volatile int state(代表共享資源)和一個FIFO執行緒等待佇列(多執行緒爭用資源被阻塞時會進入此佇列)。這裡volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:
- getState()
- setState()
- compareAndSetState()
這裡就不詳細去說AQS了.因為開頭的那個文章已經說得很清楚了.介紹以下他的實現類吧
1.CountDownLatch(計數器)
CountDownLatch是在java1.5被引入的,它都存在於java.util.concurrent包下。CountDownLatch這個類能夠使一個執行緒等待其他執行緒完成各自的工作後再執行。例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有的框架服務之後再執行。CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。
主要在實時系統中的使用場景
- 實現最大的並行性:有時我們想同時啟動多個執行緒,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,並讓所有執行緒都在這個鎖上等待,那麼我們可以很輕鬆地完成測試。我們只需呼叫 一次countDown()方法就可以讓所有的等待執行緒同時恢復執行。
- 開始執行前等待n個執行緒完成各自任務:例如應用程式啟動類要確保在處理使用者請求前,所有N個外部系統已經啟動和運行了。
- 死鎖檢測:一個非常方便的使用場景是,你可以使用n個執行緒訪問共享資源,在每次測試階段的執行緒數目是不同的,並嘗試產生死鎖。
- 有一個任務想要往下執行,但必須要等到其他的任務執行完畢後才可以繼續往下執行 (最常用的)
構造方法:
構造一個用給定計數初始化的 CountDownLatch(int count)
普通方法:
void await() 使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷。
boolean await(long timeout, TimeUnit unit) 可以設定等待的時間,如果超過此時間,計數器還未清零,則不繼續等待
void countDown() 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒
long getCount() 返回當前計數
看個例子
public class CountDownLatchDemo { private static final int THREAD_COUNT_NUM = 6; private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 6; i++) { int index = i; new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "第" + index + "個任務完成!" + Thread.currentThread().getName()); // 模擬完成一個任務,隨機模擬不同的尋找時間 Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } // 每完成一個任務,需要等待的任務數減1 // countDownLatch.countDown(); }, "我是執行緒:" + i + ":").start(); ; } // 等待檢查,即上述7個執行緒執行完畢之後,執行await後邊的程式碼 // countDownLatch.await(); System.out.println("所有任務完成!" + System.currentTimeMillis()); } }結果:
所有任務完成!1537617901498我是執行緒:3:第3個任務完成!我是執行緒:3:我是執行緒:0:第0個任務完成!我是執行緒:0:我是執行緒:1:第1個任務完成!我是執行緒:1:我是執行緒:2:第2個任務完成!我是執行緒:2:我是執行緒:4:第4個任務完成!我是執行緒:4:我是執行緒:5:第5個任務完成!我是執行緒:5:
我是執行緒:2:完成:1537617904499我是執行緒:1:完成:1537617904499我是執行緒:0:完成:1537617904499我是執行緒:3:完成:1537617904499我是執行緒:4:完成:1537617904499我是執行緒:5:完成:1537617904499
加上:
public class CountDownLatchDemo { private static final int THREAD_COUNT_NUM = 6; private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 6; i++) { int index = i; new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "第" + index + "個任務完成!" + Thread.currentThread().getName()); // 模擬完成第i個任務, Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } // 每完成一個任務,需要等待的任務數減1 countDownLatch.countDown(); }, "我是執行緒:" + i + ":").start(); ; } // 等待檢查,即上述7個執行緒執行完畢之後,執行await後邊的程式碼 countDownLatch.await(); System.out.println("所有任務完成!" + System.currentTimeMillis()); } }結果:
我是執行緒:2:第2個任務完成!我是執行緒:2:我是執行緒:1:第1個任務完成!我是執行緒:1:我是執行緒:0:第0個任務完成!我是執行緒:0:我是執行緒:3:第3個任務完成!我是執行緒:3:我是執行緒:4:第4個任務完成!我是執行緒:4:我是執行緒:5:第5個任務完成!我是執行緒:5:我是執行緒:1:完成:1537617977385我是執行緒:3:完成:1537617977385我是執行緒:4:完成:1537617977385我是執行緒:0:完成:1537617977385我是執行緒:2:完成:1537617977385我是執行緒:5:完成:1537617977387所有任務完成!1537617977388
順序有可能發生變化.但是 所有任務完成!1537617977388 時間肯定在他們之後,速度快最多一樣.肯定不會比他們小
這裡有個例子 執行緒數 大於 鎖住數時 會發生什麼呢.
public class CountDownLatchDemo { private static final int THREAD_COUNT_NUM = 8; private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 100; i++) { int index = i; new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "第" + index + "個任務完成!" + Thread.currentThread().getName()); // 模擬收集第i個龍珠,隨機模擬不同的尋找時間 Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } // 每收集到一顆龍珠,需要等待的顆數減1 countDownLatch.countDown(); }, "我是執行緒:" + i + ":").start(); ; } // 等待檢查,即上述7個執行緒執行完畢之後,執行await後邊的程式碼 countDownLatch.await(); System.out.println("所有任務完成!" + System.currentTimeMillis()); } } 結果 ............... 所有任務完成!1537618986091 ............ 我是執行緒:39:完成:1537618986097 我是執行緒:37:完成:1537618986097 我是執行緒:38:完成:1537618986097 我是執行緒:35:完成:1537618986096View Code
所以更加證明了.
CountDownLatch(THREAD_COUNT_NUM); 最多鎖住 THREAD_COUNT_NUM 個的執行緒,其他的執行緒就按原來的順序運行了
這個就直接證明了 在await()處,讓所有的任務完成了 才能繼續主執行緒
優點:
CountDownLatch的優點毋庸置疑,對使用者而言,你只需要傳入一個int型變數控制任務數量即可,至於同步佇列的出隊入隊維護,state變數值的維護對使用者都是透明的,使用方便。
缺點:
CountDownLatch設定了state後就不能更改,也不能迴圈使用。
2.CyclicBarrier
既然說了 CountDownLatch設定了state後就不能更改,也不能迴圈使用。那就來個可以迴圈使用的
舉個例子:有四個遊戲玩家玩遊戲,遊戲有三個關卡,每個關卡必須要所有玩家都到達後才能允許通過。其實這個場景裡的玩家中如果有玩家A先到了關卡1,他必須等到其他所有玩家都到達關卡1時才能通過,也就是說執行緒之間需要相互等待。這和CountDownLatch的應用場景有區別,CountDownLatch裡的執行緒是到了執行的目標後繼續幹自己的其他事情,而這裡的執行緒需要等待其他執行緒後才能繼續完成下面的工作。
案例一: 一起等待
public class CyclicBarrierDemo { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) { for (int i = 0; i < 10; i++) { int index = i; new Thread(() -> { try { Thread.sleep(1000); System.out.println(String.format("我是第%s啟動了", index)); barrier.await(); System.out.println(String.format("我是第%s完成了", index)); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, "我是第" + index + "個執行緒:").start(); } } } 結果: 我是第6啟動了 ....... 我是第0啟動了 ....... 我是第4完成了了
全部啟動,然後一起等待,再繼續完成任務
//案例二 最多等待時間
private static void test2() { for (int i = 0; i < 10; i++) { int index = i; new Thread(() -> { try { System.out.println(String.format("我是第%s啟動了", index)); // 最多阻塞時間 barrier.await(2000, TimeUnit.MILLISECONDS); System.out.println(String.format("我是第%s完成了", index)); } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { e.printStackTrace(); barrier.reset(); } }).start(); } }
// 還有一個額外的方法是 構造是可以多構造一個Runnable,在計數器的值到達設定值後(但在釋放所有執行緒之前),該Runnable執行一次,注,Runnable在每個屏障點只執行一個
private static CyclicBarrier barrier = new CyclicBarrier(1,()->{ System.out.println("優先執行我"); }); for (int i = 0; i < 2; i++) { int index = i; new Thread(() -> { try { Thread.sleep(1000); System.out.println(String.format("我是第%s啟動了", index)); barrier.await(); System.out.println(String.format("我是第%s完成了", index)); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, "我是第" + index + "個執行緒:").start(); } 結果是: 我是第1啟動了 我是第0啟動了 優先執行自己 優先執行自己 我是第1完成了 我是第0完成了
CyclicBarrier 和 CountDownLatch的比較:
- CountDownLatch: 一個執行緒(或者多個), 等待另外N個執行緒完成某個事情之後才能執行。--> 反正 你執行完 就ok.不能隨意放開
- CyclicBarrier: N個執行緒相互等待,任何一個執行緒完成之前,所有的執行緒都必須等待。--> 可以到到某個條件.我放開就行了
- CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為複雜的業務場景,比如如果計算髮生錯誤,可以重置計數器,並讓執行緒們重新執行一次。
- CountDownLatch:減計數方式,CyclicBarrier:加計數方式
3. Semaphore
訊號量(Semaphore),有時被稱為訊號燈,是在多執行緒環境下使用的一種設施, 它負責協調各個執行緒, 以保證它們能夠正確、合理的使用公共資源。
比喻:
Semaphore是一件可以容納N人的房間,如果人不滿就可以進去,如果人滿了,就要等待有人出來。對於N=1的情況,稱為binary semaphore。一般的用法是,用於限制對於某一資源的同時訪問
官方一點就是:
用於保證同一時間併發訪問執行緒的數目。
訊號量在作業系統中是很重要的概念,Java併發庫裡的Semaphore就可以很輕鬆的完成類似作業系統訊號量的控制。
Semaphore可以很容易控制系統中某個資源被同時訪問的執行緒個數。 在資料結構中我們學過連結串列,連結串列正常是可以儲存無限個節點的,而Semaphore可以實現有限大小的列表。
使用場景:僅能提供有限訪問的資源。比如資料庫連線
上例子:
// 方式 一 直接獲取
// 給出10個資源 ,最多保證10個併發 private static final Semaphore SEMAPHORE = new Semaphore(10); ....... for (int i = 0; i < 100; i++) { int index = i; new Thread(() -> { try { SEMAPHORE.acquire();// 獲取一個許可 System.out.println(String.format("我是執行緒:%s", index));// 需要併發控制的內容 Thread.sleep(3000); SEMAPHORE.release(); // 釋放一個許可 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } //結果: 很明顯的能看到 10個10的執行
// 方式 二 嘗試獲取許可,獲取不到不執行 很多時候相當於只執行設定的併發量一次
private static final Semaphore SEMAPHORE = new Semaphore(10); for (int i = 0; i < 100; i++) { int index = i; new Thread(() -> { try { // 嘗試獲取許可,獲取不到不執行 if(SEMAPHORE.tryAcquire() { System.out.println(String.format("我是執行緒:%s", index));// 需要併發控制的內容 Thread.sleep(3000); SEMAPHORE.release(); // 釋放一個許可 } } catch (InterruptedException e) { e.printStackTrace(); } }).start();
// 當時三 有個最長申請時間
private static final Semaphore SEMAPHORE = new Semaphore(10); for (int i = 0; i < 100; i++) { int index = i; new Thread(() -> { try { // 嘗試獲取許可,獲取不到不執行 最長申請時間 if(SEMAPHORE.tryAcquire(5000,TimeUnit.MILLISECONDS)) { System.out.println(String.format("我是執行緒:%s", index));// 需要併發控制的內容 Thread.sleep(3000); SEMAPHORE.release(); // 釋放一個許可 } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
注意:
1 . 其中 構造方法可以加公平鎖 :private static final Semaphore SEMAPHORE = new Semaphore(100,true);
2. SEMAPHORE.tryAcquire() => 可以增加獲取條件量 SEMAPHORE.tryAcquire(10);釋放 SEMAPHORE.release(10);