1. 程式人生 > >多執行緒——CyclicBarrier迴圈屏障

多執行緒——CyclicBarrier迴圈屏障

概述

CyclicBarrier是一個同步工具類,可以翻譯成迴圈屏障,也叫障礙器或同步屏障。

CyclicBarrier內部有一個計數器count,呼叫障礙器的await方法會使計數器count的值減一,當計數器count的值為0時,表明呼叫了await方法執行緒已經達到了設定的數量。

當障礙器的屏障被打破後,會重置計數器,因此叫做迴圈屏障。

比較CountDownLatch和CyclicBarrier:

  1. CountDownLatch的作用其實就是一個計數器,當阻塞的執行緒數達到CountDownLatch設定的臨界值後,CountDownLatch將會喚醒阻塞的執行緒,並且後面將失效不再阻塞執行緒,因此CountDownLatch也可以理解為一次性的障礙器
  2. 相比較CountDownLatch , CyclicBarrier可以設定條件執行緒barrierCommand時,並且CyclicBarrier的是迴圈屏障,CyclicBarrier只要內部不發生異常,是可以通過重置計數器來重複使用的。

原理

障礙器內部有一個ReentrantLock變數lock(顯式鎖),還有通過該顯式鎖lock獲得的Condition變數trip。線上程裡呼叫障礙器await方法,而在await方法內部呼叫了dowait方法(dowait方法使用了顯式鎖變數lock),在dowait方法內部會根據計數器count判斷,如果count不等於0,將會呼叫Condition變數trip的await方法,也就是說實際上障礙器的await方法是通過Condition變數trip的await()方法阻塞了所有的進行到這裡的執行緒, 每個執行緒執行await方法都會令計數器count減一,當count值為0時,然後會呼叫Condition變數trip的signalAll方法,喚醒所有阻塞的執行緒。

作用

  1. 設定一個屏障(也可以叫同步點),當某一個執行緒到達屏障後會阻塞該執行緒,只有當到達屏障的執行緒數達到臨界值parties後,那些在屏障處被阻塞的執行緒才被喚醒繼續執行。
  2. 可以在屏障處設定一個待執行的執行緒A,當所有執行緒到達屏障時,會執行執行緒A,然後開啟屏障讓哪些被阻塞的執行緒繼續執行。這裡容易有一個誤解就是,並不是要等到執行緒A執行結束後,被阻塞的執行緒才繼續執行,如果執行緒A中呼叫了wait()、yield方法,此時被阻塞的執行緒可以不必等到執行緒A執行完畢就能繼續執行,而如果執行緒A呼叫了sleep方法,被阻塞的執行緒仍然需要等到執行緒A執行完畢後才能繼續執行。

 CyclicBarrier的內部變數

    //該內部類用於表明當前迴圈屏障的狀態,當broken為true時表示障礙器發生了異常
    private static class Generation {
        boolean broken = false;
    }
    //CyclicBarrier內部的顯示鎖
    private final ReentrantLock lock = new ReentrantLock();
    //通過上面的顯式鎖得到的Condition變數,障礙器能夠阻塞和喚醒多個執行緒完全得益於這個Condition
    private final Condition trip = lock.newCondition();
    //臨界值,當障礙器阻塞的執行緒數等於parties時即count=0,障礙器將會通過trip喚醒目前所有阻塞的執行緒
    private final int parties;
    //條件執行緒,當屏障被打破時,在障礙器通過trip喚醒所有正被阻塞的的執行緒之前,執行該執行緒,這個執行緒可以充當一個主執行緒,那些被阻塞的執行緒可以充當子執行緒,即可以實現當所有子執行緒都達到屏障時呼叫主執行緒的作用
    private final Runnable barrierCommand;
    //內部類Generation變量表示當前迴圈屏障CyclicBarrier的狀態
    private Generation generation = new Generation();
    //計數器,用於計算還剩多少個執行緒還沒有達到屏障處,初始值應該等於臨界值parties
    private int count;

建構函式原始碼

    //這個建構函式設定了條件執行緒barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    //不設定條件執行緒的建構函式    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

CyclicBarrier迴圈屏障實現阻塞和喚醒執行緒的關鍵原始碼

1.await方法原始碼

    //CyclicBarrier內部定義的無參的await方法,可以看出在內部呼叫的dowait方法才是關鍵
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    //CyclicBarrier內部定義的有參的await方法,這兩個引數的作用是表示執行緒到達屏障後需要等待的時間,如果不設定時間當前執行緒無論如何也需要等到所有的執行緒都達到屏障後才能繼續執行,而設定時間後,當等待的時間等於設定的時間後,無論還有沒有執行緒到達屏障當前執行緒都將繼續執行
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    

2.dowait方法原始碼,這個方法中實現了障礙器阻塞執行緒和喚醒執行緒的功能

    //timed:表示是否設定了等待時間
    //nanos等待的時間(納秒)
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //使用CyclicBarrier定義的顯示鎖,加鎖避免併發問題
        lock.lock();
        try {
            //當前迴圈屏障的狀態
            final Generation g = generation;
            //如果為true,表示障礙器之前發生了異常,丟擲異常BrokenBarrierException
            if (g.broken)
                throw new BrokenBarrierException();
            //當前執行緒是否被中斷
            if (Thread.interrupted()) {
                breakBarrier();//該方法會重置計數值count為parties,並且喚醒所有被阻塞的執行緒,並改變狀態Generation
                throw new InterruptedException();
            }
            //屏障計數器減一
            int index = --count;
            //如果index等於0 ,達到屏障的執行緒的數量等於最開始設定的數量parties
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //如果條件執行緒不為空,則執行條件執行緒
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //喚醒所有被阻塞的執行緒,並且重置計數器count,生成新的狀態generation
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)//如果ranAction為true,表示上面的程式碼沒有順利執行結束,表示障礙器發生了異常,呼叫breakBarrier重置計數器,並設定generation.broken=true表示當前的狀態
                        breakBarrier();
                }
            }

            // 當計數器為零呼叫了Condition的喚醒方法、或者broken為true、或者執行緒中斷、或者等待超時時跳出異常
            for (;;) {
                try {
                    //阻塞當前執行緒,如果timed為false表示沒有設定等待的時間
                    if (!timed)
                        //不限時阻塞執行緒,只有當呼叫喚醒方法後才會繼續執行
                        trip.await();
                    else if (nanos > 0L)
                        //等待nanos毫秒
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //呼叫await方法如果發生異常,並且此時CyclicBarrier還沒有呼叫nextGeneration()方法重置計數器和generation
                    if (g == generation && ! g.broken) {
                        breakBarrier();//該方法會喚醒所有阻塞的執行緒,並且重置計數器,而且設定generation.broken = true表示障礙器發生了異常。
                        throw ie;
                    } else {
                        //中斷當前執行緒
                        Thread.currentThread().interrupt();
                    }
                }
                //g.broken為true,表示障礙器發生了異常,丟擲異常
                if (g.broken)
                    throw new BrokenBarrierException();
                //index=0的喚醒操作順利執行完了,所以通過nextGeneration()方法更新了generation,而由於generation是執行緒中的共享變數,所以當前執行緒此時 g!=generation
                if (g != generation)
                    return index;
                //如果timed為true表示設定了執行緒阻塞的時間,然後時間nanos卻小於等於0,
                if (timed && nanos <= 0L) {
                    breakBarrier();//此時重置計數器,並且設定generation.broken=true表示障礙器發生異常
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    //喚醒所有執行緒,重置計數器count,重新生成generation
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

    //設定generation.broken=true表示障礙器發生的異常,重置計數器count,喚醒所有阻塞的執行緒
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

例子說明:

下面的例子可以說明迴圈障礙CyclicBarrier的使用方式:

public class TestCyclicBarrier {
    public static void main(String[] args) {
        /**
         *
         * 為了適應一種新的設計需求,比如一個大型的任務,常常需要分配好多子任務去執行,
         * 只有當所有子任務都通知主任務執行的時候,才能執行主任務,這時候,就可以選擇障礙器了。
         */
        // 建立障礙器,並設定MainTask為所有定數量的執行緒都達到障礙點時候所要執行的任務(Runnable)
        CyclicBarrier cb = new CyclicBarrier(3, new MainTask());
        new SubTask("A", cb, 1).start();
        new SubTask("B", cb, 2).start();
        new SubTask("C", cb, 3).start();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new SubTask("D", cb, 4).start();
        new SubTask("E", cb, 5).start();
        new SubTask("F", cb, 6).start();


    }
}

/**
 * 主任務
 */
class MainTask implements Runnable {
    public void run() {
        System.out.println("-----------在收到所有的子任務開始執行的通知後,主任務執行了!");
    }
}

/**
 * 子任務
 */
class SubTask extends Thread {
    private String name;
    private CyclicBarrier cb;
    private int n;

    SubTask(String name, CyclicBarrier cb, int n) {
        this.name = name;
        this.cb = cb;
        this.n = n;
    }

    public void run() {
        System.out.println("[子任務" + name + "]開始執行了!");
        try {
            // 模擬耗時的任務
            Thread.sleep(1000 * n);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("[子任務" + name + "]暫停,並通知障礙器主任務該執行了!");
        try {
            //無限時等待
            cb.await();
            //設定等待時間,等待時間到了之後,就算障礙器沒有開啟,主執行緒還沒有執行,當前執行緒依然繼續執行
//			cb.await(1000,TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主任務執行完畢後,[子任務" + name + "]繼續執行剩下的部分!");
    }
}

執行的結果為:

[子任務A]開始執行了!
[子任務C]開始執行了!
[子任務B]開始執行了!
[子任務A]暫停,並通知障礙器主任務該執行了!
[子任務B]暫停,並通知障礙器主任務該執行了!
[子任務C]暫停,並通知障礙器主任務該執行了!
-----------在收到所有的子任務開始執行的通知後,主任務執行了!
主任務執行完畢後,[子任務C]繼續執行剩下的部分!
主任務執行完畢後,[子任務A]繼續執行剩下的部分!
主任務執行完畢後,[子任務B]繼續執行剩下的部分!
[子任務D]開始執行了!
[子任務E]開始執行了!
[子任務F]開始執行了!
[子任務D]暫停,並通知障礙器主任務該執行了!
[子任務E]暫停,並通知障礙器主任務該執行了!
[子任務F]暫停,並通知障礙器主任務該執行了!
-----------在收到所有的子任務開始執行的通知後,主任務執行了!
主任務執行完畢後,[子任務F]繼續執行剩下的部分!
主任務執行完畢後,[子任務D]繼續執行剩下的部分!
主任務執行完畢後,[子任務E]繼續執行剩下的部分!