CountDownLatch 閉鎖、Semaphore訊號量、Barrier柵欄
同步工具類可以是任何一個物件。阻塞佇列可以作為同步工具類,其他型別的同步工具類還包括訊號量(Semaphore)、柵欄(Barrier)、以及閉鎖(Latch)。
所有的同步工具類都包含一些特定的結構化屬性:它們封裝了一些狀態,這些狀態將決定執行同步工具類的執行緒是繼續執行還是等待,此外還提供了一些方法對狀態進行操作,以及另一些方法用於高效地等待同步工具類進入到預期狀態。
1.閉鎖
閉鎖是一種同步工具類,可以延遲執行緒進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,當到達結束狀態時允許所有的執行緒通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠開啟。閉鎖可以用來確保某些活動直到其他活動都完成才繼續執行。
CountDownLatch是一種靈活的閉鎖實現,它可以使一個或多個執行緒等待一組執行緒。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown遞減計數器,表示一個事件已經發生,而await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那麼await會一直阻塞直到計數器為零,或者等待中的執行緒中斷,或者等待超時。
檢視原始碼發現:我們傳進去的引數相當於內部Sync的狀態,每次呼叫countDown的時候將狀態值減一,狀態值為0表示結束狀態(await會解除阻塞)
public CountDownLatch(intcount) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void countDown() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
檢視sync的原始碼:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } ... }
例如:實現一個統計多個執行緒併發執行任務的用時功能:
當執行緒執行run中程式碼的時候會阻塞到startLatch.await(); 直到主執行緒呼叫startLatch.countDown(); 將計數器減一。這時所有執行緒開始執行任務。
當執行緒執行完的時候endLatch.countDown();將結束必鎖的計數器減一,此時主執行緒阻塞在endLatch.await();,直到5個執行緒都執行完主執行緒也解除阻塞。
package cn.qlq.thread.tone; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo4 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class); public static void main(String[] args) throws InterruptedException { final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch endLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { Thread.sleep(1 * 1000); new Thread(new Runnable() { @Override public void run() { try { startLatch.await();// 起始閉鎖的計數器阻塞等到計數器減到零(標記第一個執行緒開始執行) Thread.sleep(1 * 1000); endLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } // 實現計時 long startTime = System.nanoTime(); startLatch.countDown();// 將起始閉鎖的計數器減一 endLatch.await();// 結束閉鎖阻塞直到計數器為零 long endTime = System.nanoTime(); LOGGER.error("結束,用時{}", endTime - startTime); } }
2.Semaphore 訊號量
計數訊號量(counting Semaphore)用來控制同時訪問某個資源的數量,或者同時執行某個操作的數量。計數訊號量還可以實現某種資源池,或者對容器實施邊界。
訊號量是1個的Semaphore意味著只能被1個執行緒佔用,可以用來設計同步(相當於互斥鎖)。訊號量大於1的Semaphore可以用來設計控制併發數,或者設計有界容器。
Semaphore中管理者一組虛擬的許可(permit),許可的初始數量可由建構函式指定。在執行操作時首先獲得許可(只要還有剩餘的許可),並在使用後釋放。如果沒有許可,那麼acquire將會一直阻塞直到有許可(或者直到中斷或者操作超時)。release方法將返回一個許可給訊號量。計算訊號量的一種簡化形式是二值訊號量,即初始值為1的Semaphore。二值訊號量可以用作互斥體(mutex),並具備不可重入的加鎖語義:誰擁有了這個唯一的許可誰就擁有了互斥鎖。
例如:例如訊號量構造一個有界阻塞容器:
訊號量的計數值初始化為容器的最大值。add操作在向底層容器新增一個元素之前,首先要獲取一個許可。如果add沒有新增任何元素,那麼會立刻釋放訊號量。同樣,remove操作釋放一個許可,使更多的元素能加到容器中。
class BoundedHashSet<T> { private Set<T> set; private Semaphore semaphore; public BoundedHashSet(int bound) { set = Collections.synchronizedSet(new HashSet()); semaphore = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { semaphore.acquire();// 嘗試獲取訊號量 boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) {// 如果新增失敗就釋放訊號量,新增成功就佔用一個訊號量 semaphore.release(); } } } public boolean remove(T o) throws InterruptedException { boolean remove = set.remove(o); if (remove)// 如果刪除成功之後就釋放一個訊號量 semaphore.release(); return remove; } }
測試程式碼:
BoundedHashSet<String> boundedHashSet = new BoundedHashSet<String>(3); System.out.println(boundedHashSet.add("1")); System.out.println(boundedHashSet.add("2")); System.out.println(boundedHashSet.add("2")); System.out.println(boundedHashSet.add("3")); System.out.println(boundedHashSet.add("4"));// 將會一直阻塞到這裡 System.out.println("=========");
結果:(JVM不會關閉)
注意:
1.Semaphore可以指定公平鎖還是非公平鎖,預設是非公平鎖
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
2.acquire方法和release方法是可以有引數的,表示獲取/返還的訊號量個數
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
3. Barrier柵欄
柵欄(Barrier)類似於閉鎖(一種同步工具,可以延遲執行緒直到其達到其終止狀態),它能阻塞一組執行緒直到某個事件發生。柵欄與閉鎖的區別在於所有執行緒必須同時到達柵欄位置,才能繼續執行。閉鎖等於等待事件,而柵欄用於等待其他執行緒。柵欄可以用於實現一些協議,例如幾個家庭成員決定在某個地方集合:"所有人6:00到達目的地,然後討論下一步的事情"。
3.1 CyclicBarrier柵欄(迴圈屏障)
CyclicBarrier可以使一定數量的參與方反覆地在柵欄位置彙集,它在並行迭代演算法中非常有用:這種演算法通常將一個問題劃分成一系列相互獨立的子問題。當執行緒到達柵欄位置時將呼叫await方法,這個方法將阻塞到所有執行緒到達柵欄位置。如果所有執行緒都到達柵欄,那麼柵欄將開啟所有執行緒被釋放,而柵欄將被重置以便下次使用。如果對await的呼叫超時,或者await阻塞的執行緒被中斷,那麼柵欄就被認為是打破了,所有阻塞的await呼叫都將終止並丟擲BrokenBarrierException。如果成功的通過柵欄,那麼await將為每個執行緒返回一個唯一的到達索引號,我們可以用這些索引號"選舉"產生一個領導執行緒,並在下一次迭代中由該領導執行緒執行一些特殊的工作。CyclicBarrier還可以使你將一個柵欄操作傳遞給建構函式,這是一個Runnable,當成功的通過柵欄時會(在一個子執行緒)執行它,但在阻塞過程被釋放之前是不能執行的。
CyclicBarrier的構造方法可以傳入參與的數量(也就是被柵欄攔截的執行緒的數量),也可以傳入一個Runnable物件。
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
例如:
package cn.qlq.thread.tone; import java.util.concurrent.CyclicBarrier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo2 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class); public static void main(String[] args) throws InterruptedException { final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); for (int i = 0; i < 4; i++) { Thread.sleep(2 * 1000); new Thread(new Runnable() { @Override public void run() { LOGGER.info("threadName -> {}", Thread.currentThread().getName()); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("threadName -> {}", Thread.currentThread().getName()); } }).start(); } } }
結果:
18:08:00 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:04 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
00的時候0執行緒到達柵欄進入阻塞,02的時候1執行緒到達柵欄,由於柵欄的參與者是2所以此時相當於所有執行緒到達柵欄,柵欄放開,然後柵欄被重置。
04的時候2執行緒到達柵欄進入阻塞,06的時候3執行緒到達柵欄,由於柵欄的參與者是2所以此時相當於所有參與者執行緒到達柵欄,然後柵欄放開。
我們將柵欄的參與者改為5檢視結果:
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
結果:4個執行緒會阻塞到await方法處,而且JVM不會關閉,因為柵欄的參與者不夠5個所以被一直阻塞。
3.2 Exchanger
Exchanger相當於一個兩方(Two-party)柵欄,各方在柵欄位置上交換資料。當兩方執行不對稱的操作時,Exchanger非常有用。例如:一個執行緒向緩衝區寫東西,另一個執行緒從緩衝區讀資料。Exchanger相當於參與者只有兩個的CyclicBarrier。
兩個執行緒會阻塞在exchanger.exchange方法上,泛型可以指定其交換的資料型別。
例如:兩個執行緒交換自己的執行緒名稱
package cn.qlq.thread.tone; import java.util.concurrent.Exchanger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo3 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class); public static void main(String[] args) throws InterruptedException { final Exchanger<String> exchanger = new Exchanger<String>();// 泛型指定交換的資料 for (int i = 0; i < 4; i++) { Thread.sleep(2 * 1000); new Thread(new Runnable() { @Override public void run() { LOGGER.info("threadName -> {}", Thread.currentThread().getName()); try { String exchange = exchanger.exchange(Thread.currentThread().getName()); LOGGER.error("threadName -> {},exchange->{}", Thread.currentThread().getName(), exchange); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("threadName -> {}", Thread.currentThread().getName()); } }).start(); } } }
結果:
18:28:33 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-0
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-1,exchange->Thread-0
18:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-0,exchange->Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-0
18:28:37 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-3,exchange->Thread-2
18:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-2,exchange->Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2