併發工具類簡介
併發工具類
-
CountDownLatch:閉鎖,也叫執行緒遞減鎖。對執行緒進行計數,在計數歸零之前執行緒會陷入阻塞;直到計數歸零為止,才會放開阻塞。
用給定的計數初始化 CountDownLatch。由於呼叫了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的執行緒,await 的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
- 內部採用共享鎖來實現
import java.util.concurrent.CountDownLatch; /** * 模擬:考試 * 需求:考官(執行緒A1,A2)和考生(執行緒B1、B2、B3、B4) * 都到考場後才能開始考試 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch cdl = new CountDownLatch(6); new Thread(new Teacher(cdl)).start(); new Thread(new Teacher(cdl)).start(); new Thread(new Student(cdl) ).start(); new Thread(new Student(cdl)).start(); new Thread(new Student(cdl)).start(); new Thread(new Student(cdl)).start(); //計數減為1時自然喚醒 cdl.await(); System.out.println("開始考試"); } } class Teacher implements Runnable{ private CountDownLatch cdl; public Teacher(CountDownLatch cdl) { this.cdl = cdl; } @Override public void run() { try{ //放慢速度 Thread.sleep((long)(Math.random()*10000)); cdl.countDown(); System.out.println("考官到了"); }catch (Exception e){ e.printStackTrace(); } } } class Student implements Runnable{ private CountDownLatch cdl; public Student(CountDownLatch cdl) { this.cdl = cdl; } @Override public void run() { try{ Thread.sleep((long)(Math.random()*10000)); cdl.countDown(); System.out.println("考生到了"); }catch (Exception e){ e.printStackTrace(); } } }
-
CyclicBarrier:柵欄。對執行緒進行計數,在計數歸零之前執行緒會陷入阻塞;直到計數歸零為止,才會放開阻塞。一組執行緒到達同一個點後再分別繼續執行。(與閉鎖相比,並沒有結束執行緒)
它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。通俗講:讓一組執行緒到達一個屏障時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。一組執行緒結束後開啟另一組執行緒。
- 底層採用ReentrantLock + Condition實現
- 應用場景:多執行緒結果合併的操作,用於多執行緒計算資料,最後合併計算結果的應用場景
import java.util.concurrent.CyclicBarrier; /** * 所有運動員跑到起跑線之後聽到命令才能跑出去 */ public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(5); new Thread(new Runner(cb),"1號").start(); new Thread(new Runner(cb),"2號").start(); new Thread(new Runner(cb),"3號").start(); new Thread(new Runner(cb),"4號").start(); new Thread(new Runner(cb),"5號").start(); } } class Runner implements Runnable{ private CyclicBarrier cb; public Runner(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try{ //模擬運動員走到起跑線的時間 Thread.sleep((long)(Math.random()*10000)); String name = Thread.currentThread().getName(); System.out.println(name+"運動員到了起跑線"); /** * 先到的運動員應該阻塞, * 直到所有的運動員都到了起跑線才能往外跑。 * 當計數歸零的時候,自然甦醒 */ cb.await(); System.out.println(name+"運動員跑了出去"); }catch (Exception e){ e.printStackTrace(); } } }
-
Exchanger:交換機。用於交換兩個執行緒之間的資訊。
具體來說,Exchanger類允許在兩個執行緒之間定義同步點。當兩個執行緒都到達同步點時,他們交換資料結構,因此第一個執行緒的資料結構進入到第二個執行緒中,第二個執行緒的資料結構進入到第一個執行緒中
import java.util.concurrent.Exchanger; public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> stringExchanger = new Exchanger<>(); new Thread(new Producer(stringExchanger)).start(); new Thread(new Consumer(stringExchanger)).start(); } } class Producer implements Runnable{ private Exchanger<String> ex; public Producer(Exchanger<String> ex) { this.ex = ex; } @Override public void run() { String info="商品"; try { String msg = ex.exchange(info); System.out.println("生產者收到消費者的:"+msg); } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable{ private Exchanger<String> ex; public Consumer(Exchanger<String> ex) { this.ex = ex; } @Override public void run() { String info="錢"; try { // 消費者將錢給生產者,應該收到生產者換過來的商品 String exchange = ex.exchange(info); System.out.println("消費者收到生產者的:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } } }
-
Semaphore:訊號量。執行緒在獲取訊號之後執行程式碼,而在訊號被全部佔用之後,後來的執行緒需要阻塞,直到前面的執行緒釋放訊號,阻塞的執行緒才能獲取訊號執行邏輯。實際開發中,常用於限流。
【作用】限制某段程式碼塊的併發數。Semaphore有一個建構函式,可以傳入一個int型整數n,表示某段程式碼最多隻有n個執行緒可以訪問,如果超出了n,那麼請等待,等到某個執行緒執行完畢這段程式碼塊,下一個執行緒再進入。由此可以看出如果Semaphore建構函式中傳入的int型整數n=1,相當於變成了一個synchronized了。
從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。
訊號量Semaphore是一個非負整數(>=1)。當一個執行緒想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源並使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他執行緒全部佔用,執行緒必須要等待其他執行緒釋放資源。當執行緒釋放資源時,Semaphore則+1
- 內部採用共享鎖實現
- 應用場景:通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目
import java.util.concurrent.Semaphore; public class SemaphoreDemo { public static void main(String[] args) { Semaphore s = new Semaphore(5); for(int i=0;i<7;i++){ new Thread(new Easter(s)).start(); } } } //用餐的人 class Easter implements Runnable{ private Semaphore s; public Easter(Semaphore s) { this.s = s; } /** * 桌子的數量是有限的 * 如果桌子被全部佔用,後來的客人就需要等待 * 桌子相當於訊號,只要有訊號,就可以使用 */ @Override public void run() { try{ s.acquire(); System.out.println("來了一波客人,佔用了一張桌子~~~"); //模擬吃飯的時間 Thread.sleep((long) (Math.random() * 10000)); System.out.println("客人買單離開,空出一張桌子~~~"); // 釋放1個訊號,被阻塞的執行緒就可以獲取訊號執行程式碼 s.release(); }catch (Exception e){ e.printStackTrace(); } } }
CountDownLatch與CyclicBarrier區別
- CountDownLatch的作用是允許1或N個執行緒等待其他執行緒完成執行;而CyclicBarrier則是允許N個執行緒相互等待。
- CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,因此它被稱為是迴圈的barrier。
- CyclicBarrier只能喚起一個任務,CountDownLatch可以喚起多個任務。