1. 程式人生 > 其它 >迴環屏障CyclicBarrier詳解

迴環屏障CyclicBarrier詳解

技術標籤:javajava併發程式設計之美java多執行緒thread併發程式設計

CountDownLatch的計數器是一次性的,也就是等到計數器值變為0後,再呼叫CountDownLatch的await和countdown方法都會立刻返回,這就起不到執行緒同步的效果了。CyclicBarrier可以讓一組執行緒全部達到一個狀態後再全部同時執行。這裡之所以叫做迴環是因為所有等待執行緒執完畢,並重置CyclicBarrier的狀態後它可以被重用。之所以叫做屏障是因為執行緒呼叫await方法後就會被阻塞,這個阻塞點就成為屏障點,等所有執行緒都呼叫了await方法後,執行緒就會衝破屏障,繼續向下執行。

1、案例介紹

使用兩個執行緒去執行一個被分解的任務A,當兩個執行緒把自己的任務都執行完畢後再對它們的結果進行彙總處理。

public class CycleBarrierTest1 {
    // 建立一個CyclicBarrier例項,新增一個所有子執行緒全部到達屏障後執行的任務
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread() + "task1 merge result");
        }
    });

    public static void main(String[] args) throws InterruptedException {
        // 建立一個執行緒數固定為2的執行緒池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 將執行緒A新增到執行緒池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "task1-1");

                    System.out.println(Thread.currentThread() + "enter in barrier");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "enter out barrier");
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 將執行緒B新增到執行緒池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "task1-2");

                    System.out.println(Thread.currentThread() + "enter in barrier");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "enter out barrier");
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 關閉執行緒池
        executorService.shutdown();
    }
}

執行程式:

Thread[pool-1-thread-1,5,main]task1-1
Thread[pool-1-thread-2,5,main]task1-2
Thread[pool-1-thread-1,5,main]enter in barrier
Thread[pool-1-thread-2,5,main]enter in barrier
Thread[pool-1-thread-2,5,main]task1 merge result
Thread[pool-1-thread-2,5,main]enter out barrier
Thread[pool-1-thread-1,5,main]enter out barrier

  • 如上程式碼建立了一個CyclicBarrier物件,其第一個引數為計數器初始值,第二個引數Runable是當計數器值為0時需要執行的任務。
  • 在main函式裡面首先建立了一個大小為2的執行緒池,然後新增兩個子任務到執行緒池,每個子任務在執行自己的邏輯後會呼叫await方法。
  • 一開始計數器值為2,當第一個執行緒呼叫await方法時,計數器值會遞減為1.由於計數器值不為0,所以當前執行緒就到了屏障點而被阻塞。然後第二個執行緒呼叫await時,會進入屏障,計數器值也會遞減,現在計數器值為0,這時就會去執行CyclicBarrier建構函式中的任務,執行完畢後退出屏障點,並且喚醒被阻塞的第二個執行緒,這時候第一個執行緒也會退出屏障點繼續向下執行。

2、案例2

假設一個任務由階段1、階段2和階段3組成,每個執行緒要序列地執行階段1、階段2和階段3,當多個執行緒執行該任務時,必須要保證所有執行緒的階段1全部完成後才能進入階段2執行,當所有執行緒的階段2全部完成後才能進入階段3執行:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CycleBarrierTest2 {
    // 建立一個CyclicBarrier例項
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 將執行緒A新增到執行緒池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "step1");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() + "step2");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() + "step3");
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 將執行緒B新增到執行緒池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "step1");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() + "step2");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() + "step3");
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 關閉執行緒池
        executorService.shutdown();
    }
}

執行程式:

Thread[pool-1-thread-2,5,main]step1
Thread[pool-1-thread-1,5,main]step1
Thread[pool-1-thread-1,5,main]step2
Thread[pool-1-thread-2,5,main]step2
Thread[pool-1-thread-2,5,main]step3
Thread[pool-1-thread-1,5,main]step3
  • 如上程式碼,每個子執行緒在執行完階段1後都呼叫了await方法,等到所有執行緒都到達屏障點後才會一塊往下執行,這就保證了所有執行緒都完成了階段1後才會開始執行階段2.然後在階段2後面呼叫了await方法,這保證了所有執行緒都完成了階段2後,才開始階段3的執行。這個功能使用單個CountDownLatch是無法完成的。

3、實現原理

由以上類圖可知,CyclicBarrier基於獨佔鎖實現,本質底層還是基於AQS的。parties用來記錄執行緒個數,這裡表示多少執行緒呼叫await後,所有執行緒才會衝破屏障繼續往下執行。

count一開始等於parties,每當有執行緒呼叫await方法就遞減1,當count為0時就表示所有執行緒都到了屏障點。

4、主要方法

4.1、int await()方法

當執行緒呼叫CyclicBarrier的該方法時會被阻塞,直到滿足下面條件之一才會返回:

  • parties個執行緒都呼叫了await()方法,也就是執行緒都到了屏障點;
  • 其他執行緒呼叫了interrupt()方法中斷了當前執行緒,則當前執行緒會丟擲InterruptedException異常而返回;
  • 與當前屏障點關聯的Generation物件的broken標誌被設定為true時,會丟擲BrokenBarrierException異常,然後返回;

4.2、boolean await(long timeout, TimeUnit unit)方法

當前執行緒呼叫CyclicBarrier的該方法時會被阻塞,直到滿足下面條件之一才會返回:

  • parties個執行緒都呼叫了await()方法,也就是執行緒都到了屏障點,這時候返回true;
  • 設定的超時時間到了後返回false;
  • 其他執行緒呼叫當前執行緒的interrupt()方法中斷了當前執行緒,則當前執行緒會丟擲InterruptedException異常然後返回;
  • 與當前屏障點關聯的Generation物件的broken標誌被設定為true時,會丟擲BrokenBarrierException異常,然後返回。

5、總結

CyclicBarrier與CountDownLatch的不同在於,前者可以複用,並且前者特別適合分段任務有序執行的場景。CyclicBarrier通過獨佔鎖ReentrantLock實現計數器原子性更新,並使用條件變數佇列來實現執行緒同步。