併發包下常見的同步工具類詳解(CountDownLatch,CyclicBarrier,Semaphore)
目錄
1. 前言
在實際開發中,碰上CPU密集且執行時間非常耗時的任務,通常我們會選擇將該任務進行分割,以多執行緒方式同時執行若干個子任務,等這些子任務都執行完後再將所得的結果進行合併。這正是著名的map-reduce思想,不過map-reduce通常被用在分散式計算的語境下,這裡舉這個例子只是為了說明對多執行緒併發執行流程進行控制的重要性,比如某些執行緒必須等其他執行緒執行完後才能開始它的工作。使用jdk中的內建鎖或者重入鎖配合等待通知機制可以實現這個需求,但是會比較麻煩。因為不管是內建還是重入鎖,它們關注的重點在於如何協調多執行緒對共享資源的訪問,而不是協調特定執行緒的執行次序,完成複雜的併發流程控制。好在JDK在併發包下提供了CountDownLatch,CyclicBarrier,Semaphore等併發工具,可以讓我們站在更高的角度思考並解決這個問題。
2. 閉鎖CountDownLatch
2.1 CountDownLatch功能簡介
CountDownLatch通常稱之為閉鎖。它可以使一個或一批執行緒在閉鎖上等待,等到其他執行緒執行完相應操作後,閉鎖開啟,這些等待的執行緒才可以繼續執行。確切的說,閉鎖在內部維護了一個倒計數器。通過該計數器的值來決定閉鎖的狀態,從而決定是否允許等待的執行緒繼續執行。該計數器的初始值由使用者在建立閉鎖物件時通過傳入的構造引數決定,如下所示
/** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
預設計數器初始值不能小於0,否則將丟擲異常。
當計數器的值大於0時,該閉鎖處於關閉狀態,呼叫閉鎖的await()
方法將導致當前執行緒在閉鎖上等待。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
但是我們可以通過呼叫閉鎖的countDown()
方法來使閉鎖的計數值減少
public void countDown() {
sync.releaseShared(1);
}
每呼叫一次countDown()
方法都會使閉鎖的計數值減少1,所以閉鎖的計數器準確來說是個倒計數器。當計數值減少到0時,阻塞在閉鎖上的執行緒將被喚醒從而繼續執行。下面以一個類似map-reduce的例子來對CountDownLatch的用法做講解。
2.2 使用CountDownLatch
為了計算一個CPU密集型的大任務,將該任務分割成10個子任務,交由開啟的10個子執行緒去執行。當所有子任務執行完畢後,主執行緒再執行後續的工作。任務的執行時間以執行緒休眠進行模擬,整個流程以日誌方式進行記錄。完整程式碼如下
/**
* @author: takumiCX
* @create: 2018-09-17
**/
class CountDownLatchTest {
static CountDownLatch countDownLatch;
public static void main(String[] args) throws InterruptedException {
int count=10;
//初始化計數器值為10
countDownLatch=new CountDownLatch(count);
//開啟10個子執行緒執行子任務
for(int i=0;i<count;i++){
Thread thread = new Thread(new CountDownThread(countDownLatch,i));
thread.start();
}
//主執行緒等待,直到所有子任務完成
countDownLatch.await();
//模擬主執行緒執行後續工作
TimeUnit.SECONDS.sleep(1);
System.out.println("任務執行完畢!");
}
private static class CountDownThread implements Runnable{
CountDownLatch countDownLatch;
//子任務序號
int taskNum;
public CountDownThread(CountDownLatch countDownLatch, int taskNum) {
this.countDownLatch = countDownLatch;
this.taskNum = taskNum;
}
@Override
public void run() {
try {
//模擬子任務的執行
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任務執行完畢,則呼叫countDown方法使計數器值減少1
countDownLatch.countDown();
System.out.println("子任務:"+taskNum+" 執行完畢!");
}
}
}
結果如下所示
可以看到主執行緒在所有子任務執行完前必須在閉鎖上等待。當最後一個子任務完成後,它將被喚醒,從而可以繼續之後的工作。
2.3 CountDownLatch原理淺析
CountDownLatch底層也是通過AQS實現的。和ReentrentLock以獨佔的方式獲取和釋放同步狀態不同,CountDownLatch是以共享的方式獲取和釋放同步狀態的。獨佔式和共享式的區別主要有以下幾點:
- 1.獨佔式一次只允許一個執行緒獲取同步狀態,而共享式一次允許多個執行緒同時獲取同步狀態。
- 2.當在同步佇列等待的執行緒被喚醒然後成功獲取同步狀態時,它還必須喚醒後續結點中的執行緒,並將這個過程傳遞下去,使得多個執行緒可以同時獲取到同步狀態。
同步狀態依舊使用AQS中的state值進行表示,在CountDownLatch的語境下表示計數器的值,且只有在state=0時執行緒才能成功獲取到同步狀態,儘管有些奇怪,不過考慮到CountDownLatch中的計數器是個倒計數器,這麼設定也並非不可理解。為了更好的理解CountDownLatch的原始碼,從釋放同步狀態的方法countDown()
開始講起
public void countDown() {
sync.releaseShared(1);
}
正確找到sync的實現類後跟進原始碼
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //嘗試在共享模式下釋放同步狀態
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()嘗試在共享模式下釋放同步狀態,該方法是在AQS中定義的鉤子方法,必須由AQS的實現類自己實現,方法內容如下
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); //獲取同步狀態值
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) //以CAS方式更新同步狀態值
return nextc == 0;
}
}
使用死迴圈+CAS方式將計數值state減少1。僅當更新操作成功且state值被更新為0時返回true,表示在共享模式下釋放同步狀態成功,接著便會繼續執行doReleaseShared()方法,方法內容如下
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //喚醒後繼結點中的執行緒
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
該方法主要完成的工作是喚醒頭結點之後的結點中的執行緒。那麼其他在同步佇列中等待的執行緒使如何被喚醒的?別急,我們可以在await()方法中找到答案。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
找到sync正確的實現類後跟進原始碼
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared()是在共享模式下嘗試獲取同步狀態,
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
當同步狀態值state=0時返回1,表示獲取同步狀態成功,否則返回-1表示獲取同步狀態失敗。獲取同步狀態失敗的執行緒顯然應該加入同步等待佇列並在佇列中等待,這部分邏輯我們在解讀ReentrentLock的原始碼時應該已經看過了,不過在共享模式下細節方面有些不同
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//構造結點並加入同步佇列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//當前驅結點是頭結點時獲取同步狀態
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第一步自然是構造結點並加入同步佇列尾部,這部分邏輯在addWaiter()
方法中,注意結點型別為共享型別。之後的邏輯和獨佔模式類似,檢查前驅結點是否是佇列的頭結點,是則嘗試獲取同步狀態,成功則將當前結點設定為佇列頭結點,失敗則阻塞當前執行緒並等待喚醒並重新執行以上流程。不過在共享模式下,當前執行緒在成功獲取同步狀態並設定自身為頭結點後,還必須做些額外的工作:當後繼結點為共享型別時,喚醒後繼結點中的執行緒。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //設定當前結點為佇列頭結點
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); //喚醒後繼結點的執行緒
}
}
至此,CountDownLatch的原理就搞明白了,它是以AQS的共享模式來實現複雜的併發流程控制的。當其內部的計數器不為0時,呼叫其await方法將導致執行緒加入同步佇列並阻塞。當呼叫countDown方法使計數器的值為0時,會喚醒佇列中第一個等待的執行緒,之後由該執行緒喚醒後面的執行緒,以此類推,直到阻塞在閉鎖上的執行緒都被成功喚醒。
3.迴圈屏障CyclicBarrier
3.1 CyclicBarrier功能簡介
CyclicBarrier通常稱為迴圈屏障。它和CountDownLatch很相似,都可以使執行緒先等待然後再執行。不過CountDownLatch是使一批執行緒等待另一批執行緒執行完後再執行;而CyclicBarrier只是使等待的執行緒達到一定數目後再讓它們繼續執行。故而CyclicBarrier內部也有一個計數器,計數器的初始值在建立物件時通過構造引數指定,如下所示
public CyclicBarrier(int parties) {
this(parties, null);
}
每呼叫一次await()方法都將使阻塞的執行緒數+1,只有阻塞的執行緒數達到設定值時屏障才會開啟,允許阻塞的所有執行緒繼續執行。除此之外,CyclicBarrier還有幾點需要注意的地方:
1.CyclicBarrier的計數器可以重置而CountDownLatch不行,這意味著CyclicBarrier例項可以被重複使用而CountDownLatch只能被使用一次。而這也是迴圈屏障迴圈二字的語義所在。
- 2.CyclicBarrier允許使用者自定義barrierAction操作,這是個可選操作,可以在建立CyclicBarrier物件時指定
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
一旦使用者在建立CyclicBarrier物件時設定了barrierAction引數,則在阻塞執行緒數達到設定值屏障開啟前,會呼叫barrierAction的run()方法完成使用者自定義的操作。
3.2 使用CyclicBarrier
還是以多執行緒分割大任務併發執行的例子來進行講解,不過這次情況要稍微複雜些。執行緒在執行完分配給它的子任務後不能立即退出,必須等待所有任務都完成後再執行釋放資源的操作。而主執行緒在所有子任務都執行完畢後也要執行特定的操作,且該操作線上程釋放資源前。所有操作都以列印日誌的方式進行模擬。程式碼如下:
/**
* @author: takumiCX
* @create: 2018-09-18
**/
public class CyclicBarrierTest {
static CyclicBarrier cyclicBarrier;
public static void main(String[] args) {
int count = 10;
//當所有子任務都執行完畢時,barrierAction的run方法會被呼叫
cyclicBarrier = new CyclicBarrier(count, () ->
System.out.println("執行barrierAction操作!"));
//開啟多個執行緒執行子任務
for(int i=0;i<count;i++){
new Thread(new CyclicBarrierThread(cyclicBarrier,i)).start();
}
}
private static class CyclicBarrierThread implements Runnable {
public CyclicBarrier cyclicBarrier;
//任務序號
public int taskNum;
public CyclicBarrierThread(CyclicBarrier cyclicBarrier, int taskNum) {
this.cyclicBarrier = cyclicBarrier;
this.taskNum = taskNum;
}
@Override
public void run() {
//執行子任務
System.out.println("子任務:"+taskNum+" 執行完畢!");
try {
//等待所有子任務執行完成
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
//釋放資源
System.out.println("執行緒:"+taskNum+" 釋放資源!");
}
}
}
開啟10個執行緒執行子任務,每個執行緒執行完子任務後在CyclicBarrier上等待。等到所有子任務完成後,使用者設定自定義的barrierAction操作即被執行,之後屏障正式開啟,阻塞的所有執行緒將完成釋放資源的操作。 結果如下圖所示
3.3 CyclicBarrier原理淺析
CyclicBarrier內部使用ReentrentLock來實現執行緒同步,而通過Condition來實現執行緒的阻塞和喚醒。當計數器值為0時,首先會執行使用者自定義的barrierAction操作。
int index = --count; //計數器值
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand; //使用者自定義的barrierAction
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
之後再進行阻塞執行緒的喚醒,以及將計數器重置為初始值。這部分程式碼在nextGeneration()中
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //喚醒所有的阻塞執行緒
// set up next generation
count = parties; //計數器重置為初始值
generation = new Generation();
}
4. 訊號量Semaphore
4.1 Semaphore功能簡介
如果學過作業系統的話,對訊號量Semaphore應該不陌生。作業系統中的訊號量是這麼一個機構:它維護了一定數目的資源,程序向其請求資源將導致Semaphore中資源數量減少,當資源數量小於0時將會導致當前執行緒阻塞;而程序釋放資源將導致Semaphore中資源數量增加,當資源數量大於0時會喚醒阻塞的程序。作業系統中使用訊號量可以輕鬆實現程序間的互斥和同步。java在語言層面也支援訊號量機制,其工作原理和作業系統中的訊號量類似,可以通過呼叫
public void acquire(int permits)
或者public boolean tryAcquire(int permits)
請求訊號量中的許可(資源)。不過後者在訊號量中許可數量不夠時不會阻塞而是立即返回一個失敗結果。當然,也可以通過public void release()
向訊號量歸還資源。
訊號量在建立時必須為其指定可以用的許可總數,如下所示
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
當建立訊號量時指定許可總數為1,則可以起到獨佔鎖的作用,不過它是不允許執行緒重入的。同時,它還有公平和非公平模式之分,通過在建立物件時傳入引數進行指定
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
和ReentrentLock一樣預設是非公平模式。
4.2 使用Semaphore進行最大併發數的控制
假設伺服器上有一種資源可以同時供多個使用者進行訪問,出於系統穩定性考慮需要限制同時訪問的使用者的數量,整個過程可以模擬如下
/**
* @author: takumiCX
* @create: 2018-09-24
**/
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
//訊號量控制併發數最多為3
Semaphore semaphore = new Semaphore(3);
//同時開啟10個執行緒
for(int i=1;i<=10;i++){
new Thread(new ReaderThread(semaphore,i)).start();
}
}
static class ReaderThread implements Runnable{
Semaphore semaphore;
//使用者序號
int userIndex;
public ReaderThread(Semaphore semaphore, int userIndex) {
this.semaphore = semaphore;
this.userIndex = userIndex;
}
@Override
public void run() {
try {
//獲取許可
semaphore.acquire(1);
//模擬訪問資源所用的時間
TimeUnit.SECONDS.sleep(1);
System.out.println("使用者 "+userIndex+" 訪問資源,時間:"+System.currentTimeMillis());
//釋放許可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用訊號量限制同時併發訪問的執行緒數為3,然後開啟10個執行緒模擬使用者訪問。得到的結果如下
從結果上可以清晰的看到,每次最多允許3個使用者同時訪問資源,訊號量很好的起到了限流作用。
4.3 Semaphore原理淺析
和CountDownLatch類似,Semaphore底層也是通過AQS的共享模式實現的。它和CountDownLatch的區別只是對於AQS共享模式的鉤子方法tryAcquireShared()
和tryReleaseShared()
的實現不同。
以Semaphore的非公平模式為例,其嘗試釋放同步狀態的邏輯如下
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //獲取可用許可數
int remaining = available - acquires; //計算被消耗後剩餘的許可數
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
首先會獲取當前可用的許可值(state),根據請求數量計算出剩餘的許可值,若剩餘許可數小於0則直接返回剩餘值表示該操作失敗;否則以CAS方式將state值更新為計算後的剩餘值,並返回一個大於等於0的數表示成功。通過該方法的返回值可以知道嘗試獲取同步狀態的操作是否成功,返回值小於0表示沒有足夠的許可,執行緒將會加入同步佇列並等待;返回值大於等於0則表示許可足夠,則整個獲取許可的流程就結束了。
tryReleaseShared()的實現也很簡單,
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); //獲取當前許可數
int next = current + releases; //計算釋放後的許可總數
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //cas更新許可值
return true;
}
}
計算釋放後的許可總數並以CAS方式對state值進行更新。之後將返回上層繼續執行
doReleaseShared()
喚醒頭結點後面結點中的執行緒,被喚醒的執行緒將執行tryAcquireShared()
重新嘗試獲取同步狀態,獲取失敗則繼續阻塞,獲取成功將設定當前結點為佇列頭結點並繼續喚醒後續結點中的執行緒。