1. 程式人生 > >【JAVA併發包原始碼分析】迴圈柵欄:CyclicBarrier

【JAVA併發包原始碼分析】迴圈柵欄:CyclicBarrier

一、認識CyclicBarrier

對於CyclicBarrier大多數人感到陌生,其實CyclicBarrier是一種多執行緒併發控制使用工具,和CountDownLatch非常類似,實現執行緒之間的計數等待,也就是說一個執行緒或者多個執行緒等待其他執行緒完成任務,不過比CountDowwnLatch複雜。

CyclicBarrier是迴圈柵欄的意思,所謂柵欄就是障礙物,阻止其他人進入,在多執行緒中,使用該工具類就是阻止執行緒執行,那麼它是怎麼阻止的呢?下面會詳細介紹。前面的Cyclic意為迴圈,也就是說可以迴圈使用該計數器。舉個簡單例子,比如有5個執行緒,那麼該工具類就要等待這五個執行緒都到達指定的障礙點,執行完相應的動作後,計數器才會清零,等待下一批執行緒的到達。

下面我們來看看CyclicBarrier內部的構造以及類之間的依賴關係:
這裡寫圖片描述
上圖是CyclicBarrier內部的部分程式碼,由上圖可以畫出該工具類的構造圖如下:
這裡寫圖片描述

二、使用場景

對於該工具類使用場景也很豐富,這裡用一個簡單的例項來說明。比如,這裡有10個士兵司令下達命令,要求這10個士兵先全部集合來報道,報道完成之後再一起去執行任務,當每一個士兵的任務完成之後然後才會向司令報告任務執行完畢。

  public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw
new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

對於上述的CyclicBarrier構造方法,它接收兩個引數,第一個引數就是計數器總數,參與計數的執行緒總數,第二個引數barrierAction是一個Runnable介面,它是當一次計數完成之後要做的動作。
對於上述案例,我們來用程式碼演示該場景:

package cn.just.thread.concurrent;

import
java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 測試迴圈柵欄:CycleBarrier(int parties,Runnable barrierAction); * 第一個引數表示計數的總數,即參與的執行緒總數 * 第二個引數表示當一次計數完成後,系統會執行的動作 * @author Shinelon * */ public class CycleBarrierDemo { public static class Soldier implements Runnable{ private String soldier; private final CyclicBarrier cyclic; public Soldier(String soldier, CyclicBarrier cyclic) { super(); this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try{ //等待所有士兵到齊 cyclic.await(); doWork(); //等待所有士兵去工作 cyclic.await(); }catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } private void doWork() { try{ Thread.sleep(Math.abs(new Random().nextInt()%10000)); }catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier+":任務完成!"); } } public static class BarrierRun implements Runnable{ boolean flag; int N; public BarrierRun(boolean flag, int n) { super(); this.flag = flag; N = n; } @Override public void run() { if(flag){ System.out.println("司令:【士兵"+N+"個,任務完成】"); }else{ System.out.println("司令:【士兵"+N+"個,集合完畢】"); flag=true; } } } public static void main(String[] args) { final int N=10; Thread[] allSoldier=new Thread[N]; boolean flag=false; CyclicBarrier cyclic=new CyclicBarrier(N, new BarrierRun(flag, N)); //設定障礙點,主要是為了執行這個方法 System.out.println("集合隊伍"); for(int i=0;i<N;++i){ System.out.println("士兵"+i+"報道!"); allSoldier[i]=new Thread(new Soldier("士兵"+i, cyclic)); allSoldier[i].start(); } } }

下面是執行結果:
這裡寫圖片描述

上面的程式碼中,涉及到一個該工具類的內部方法:
await()等待所有的執行緒計數完成。該方法內部呼叫dowait方法,在dowait方法中用重入鎖進行加鎖。實現了一次計數器的等待過程。下面我們來深入原始碼探究。

三、深入原始碼

上面說道dowait方法,下面是該方法的原始碼:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //標誌著每一個執行緒,當一個執行緒到來就生成一個新生代
            final Generation g = generation;
            //當計數器被破壞,丟擲BrokenBarrierException異常
            if (g.broken)
                throw new BrokenBarrierException();
            //當執行緒被中斷。丟擲中斷異常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //當一個執行緒到來時count減1,直到count為0則計數完成
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //更新標誌,喚醒所有等待執行緒
                    nextGeneration();
                    return 0;
                } finally {
                    //如果計數完成,喚醒所有等待的執行緒,計數器重新開始工作
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    //如果當前執行緒沒有超時則繼續等待
                    if (!timed)
                        trip.await();
                      //如果呼叫超時,呼叫awaitNanos方法等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //如果所有執行緒都已經到達或者被中斷則計數完成,進入下一次迴圈
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                //如果不是同一個執行緒,則返回index
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

解釋一下上面的原始碼,對於每一個執行緒,它都會有一個generation進行標誌用來區分不同的執行緒(我是這樣理解的),因為generation物件中有一個屬性broken標誌著是否該計數器被破壞或者計數是否完成,預設是false:

 private static class Generation {
        boolean broken = false;
    }

CyclicBarrier設定了兩個異常,一個是BrokenBarrierException,另一個InterruptedException,InterruptedException異常相信大家都很熟悉,如果發生中斷則丟擲異常,BrokenBarrierException異常是當計數器被破壞的時候丟擲。當一個執行緒來到的時候count-1,然後判斷count是否為0,如果為零則計數完成,則執行下面相應的動作進入下一次的迴圈計數:

  final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //更新標誌
                    nextGeneration();
  /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
 private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

依據上面的場景我們可以理解,10個士兵執行任務,count為10,每次到來一個士兵則count-1,當10個士兵全部到來時則count為0,然後執行BarrierRun執行緒執行相應的動作。接著呼叫nextGeneration方法更新標誌並且喚醒所有等待的執行緒繼續向下執行。
在判斷計數器是否完成一次計數時它呼叫breakBarrier()方法:

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

這個方法同樣會更新標誌並且喚醒所有等待的執行緒。

在接下來的整個for迴圈中,判斷了當前執行緒是否被中斷,計數器是否被破壞,等待是否超時。

  1. 如果等待超時則呼叫awaitNanos方法繼續等待,該方法時Contition介面的實現類的一個方法,讓執行緒在合適的時間進行等待或者在特定的時間內得到通知,繼續執行,該方法內部實現複雜,筆者能力有限,這裡就不進行分析了。有興趣的話可以自己檢視原始碼。
    這裡寫圖片描述
  2. 它會判斷所有執行緒是否都已經到達,如果所有執行緒已經執行完畢到達則進行下一次迴圈
//如果所有執行緒都已經到達或者被中斷則計數完成,進入下一次迴圈
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    }

3.同時它也會判斷是否是同一個執行緒,並且更新標誌。

//如果不是同一個執行緒,則返回index
                if (g != generation)
                    return index;

當該執行緒的所有任務都執行完畢後它就會釋放鎖。

至此,本文已經介紹完CyclicBarrier工具類的介紹,本人能力有限,如有不足之處還請指教。多謝!
歡迎關注微信公眾號:
這裡寫圖片描述