【JAVA併發包原始碼分析】迴圈柵欄:CyclicBarrier
一、認識CyclicBarrier
對於CyclicBarrier大多數人感到陌生,其實CyclicBarrier是一種多執行緒併發控制使用工具,和CountDownLatch非常類似,實現執行緒之間的計數等待,也就是說一個執行緒或者多個執行緒等待其他執行緒完成任務,不過比CountDowwnLatch複雜。
CyclicBarrier是迴圈柵欄的意思,所謂柵欄就是障礙物,阻止其他人進入,在多執行緒中,使用該工具類就是阻止執行緒執行,那麼它是怎麼阻止的呢?下面會詳細介紹。前面的Cyclic意為迴圈,也就是說可以迴圈使用該計數器。舉個簡單例子,比如有5個執行緒,那麼該工具類就要等待這五個執行緒都到達指定的障礙點,執行完相應的動作後,計數器才會清零,等待下一批執行緒的到達。
下面我們來看看CyclicBarrier內部的構造以及類之間的依賴關係:
上圖是CyclicBarrier內部的部分程式碼,由上圖可以畫出該工具類的構造圖如下:
二、使用場景
對於該工具類使用場景也很豐富,這裡用一個簡單的例項來說明。比如,這裡有10個士兵司令下達命令,要求這10個士兵先全部集合來報道,報道完成之後再一起去執行任務,當每一個士兵的任務完成之後然後才會向司令報告任務執行完畢。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
對於上述的CyclicBarrier構造方法,它接收兩個引數,第一個引數就是計數器總數,參與計數的執行緒總數,第二個引數barrierAction是一個Runnable介面,它是當一次計數完成之後要做的動作。
對於上述案例,我們來用程式碼演示該場景:
package cn.just.thread.concurrent;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 測試迴圈柵欄:CycleBarrier(int parties,Runnable barrierAction);
* 第一個引數表示計數的總數,即參與的執行緒總數
* 第二個引數表示當一次計數完成後,系統會執行的動作
* @author Shinelon
*
*/
public class CycleBarrierDemo {
public static class Soldier implements Runnable{
private String soldier;
private final CyclicBarrier cyclic;
public Soldier(String soldier, CyclicBarrier cyclic) {
super();
this.soldier = soldier;
this.cyclic = cyclic;
}
@Override
public void run() {
try{
//等待所有士兵到齊
cyclic.await();
doWork();
//等待所有士兵去工作
cyclic.await();
}catch (InterruptedException e) {
e.printStackTrace();
}catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
private void doWork() {
try{
Thread.sleep(Math.abs(new Random().nextInt()%10000));
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier+":任務完成!");
}
}
public static class BarrierRun implements Runnable{
boolean flag;
int N;
public BarrierRun(boolean flag, int n) {
super();
this.flag = flag;
N = n;
}
@Override
public void run() {
if(flag){
System.out.println("司令:【士兵"+N+"個,任務完成】");
}else{
System.out.println("司令:【士兵"+N+"個,集合完畢】");
flag=true;
}
}
}
public static void main(String[] args) {
final int N=10;
Thread[] allSoldier=new Thread[N];
boolean flag=false;
CyclicBarrier cyclic=new CyclicBarrier(N, new BarrierRun(flag, N));
//設定障礙點,主要是為了執行這個方法
System.out.println("集合隊伍");
for(int i=0;i<N;++i){
System.out.println("士兵"+i+"報道!");
allSoldier[i]=new Thread(new Soldier("士兵"+i, cyclic));
allSoldier[i].start();
}
}
}
下面是執行結果:
上面的程式碼中,涉及到一個該工具類的內部方法:
await()等待所有的執行緒計數完成。該方法內部呼叫dowait方法,在dowait方法中用重入鎖進行加鎖。實現了一次計數器的等待過程。下面我們來深入原始碼探究。
三、深入原始碼
上面說道dowait方法,下面是該方法的原始碼:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//標誌著每一個執行緒,當一個執行緒到來就生成一個新生代
final Generation g = generation;
//當計數器被破壞,丟擲BrokenBarrierException異常
if (g.broken)
throw new BrokenBarrierException();
//當執行緒被中斷。丟擲中斷異常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//當一個執行緒到來時count減1,直到count為0則計數完成
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//更新標誌,喚醒所有等待執行緒
nextGeneration();
return 0;
} finally {
//如果計數完成,喚醒所有等待的執行緒,計數器重新開始工作
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
//如果當前執行緒沒有超時則繼續等待
if (!timed)
trip.await();
//如果呼叫超時,呼叫awaitNanos方法等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果所有執行緒都已經到達或者被中斷則計數完成,進入下一次迴圈
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//如果不是同一個執行緒,則返回index
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//釋放鎖
lock.unlock();
}
}
解釋一下上面的原始碼,對於每一個執行緒,它都會有一個generation進行標誌用來區分不同的執行緒(我是這樣理解的),因為generation物件中有一個屬性broken標誌著是否該計數器被破壞或者計數是否完成,預設是false:
private static class Generation {
boolean broken = false;
}
CyclicBarrier設定了兩個異常,一個是BrokenBarrierException,另一個InterruptedException,InterruptedException異常相信大家都很熟悉,如果發生中斷則丟擲異常,BrokenBarrierException異常是當計數器被破壞的時候丟擲。當一個執行緒來到的時候count-1,然後判斷count是否為0,如果為零則計數完成,則執行下面相應的動作進入下一次的迴圈計數:
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//更新標誌
nextGeneration();
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
依據上面的場景我們可以理解,10個士兵執行任務,count為10,每次到來一個士兵則count-1,當10個士兵全部到來時則count為0,然後執行BarrierRun執行緒執行相應的動作。接著呼叫nextGeneration方法更新標誌並且喚醒所有等待的執行緒繼續向下執行。
在判斷計數器是否完成一次計數時它呼叫breakBarrier()方法:
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
這個方法同樣會更新標誌並且喚醒所有等待的執行緒。
在接下來的整個for迴圈中,判斷了當前執行緒是否被中斷,計數器是否被破壞,等待是否超時。
- 如果等待超時則呼叫awaitNanos方法繼續等待,該方法時Contition介面的實現類的一個方法,讓執行緒在合適的時間進行等待或者在特定的時間內得到通知,繼續執行,該方法內部實現複雜,筆者能力有限,這裡就不進行分析了。有興趣的話可以自己檢視原始碼。
- 它會判斷所有執行緒是否都已經到達,如果所有執行緒已經執行完畢到達則進行下一次迴圈
//如果所有執行緒都已經到達或者被中斷則計數完成,進入下一次迴圈
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
}
3.同時它也會判斷是否是同一個執行緒,並且更新標誌。
//如果不是同一個執行緒,則返回index
if (g != generation)
return index;
當該執行緒的所有任務都執行完畢後它就會釋放鎖。
至此,本文已經介紹完CyclicBarrier工具類的介紹,本人能力有限,如有不足之處還請指教。多謝!
歡迎關注微信公眾號: