1. 程式人生 > >JUC之AQS之CyclicBarrier

JUC之AQS之CyclicBarrier

CyclicBarrier的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。

CyclicBarrier有兩個構造方法:

第一個構造方法只有int引數,這個引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。

第二個構造方法多了一個Runnable,用於線上程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

先看下面這個例子:

package dgb.test.concurrent;

import java.time.LocalDateTime;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author Dongguabai
 * @date 2018/9/30 11:45
 */
public class CyclicBarrierTest1 implements Runnable {

    public static final int COUNT = 5;

    private static CyclicBarrier c = new CyclicBarrier(COUNT);

    private static CyclicBarrierTest1 cyc = new CyclicBarrierTest1();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(COUNT);
        for (int i = 0; i < COUNT; i++) {
            executor.execute(() -> {
                try {
                    dosth();
                } catch (BrokenBarrierException | InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }

    public static void dosth() throws BrokenBarrierException, InterruptedException {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(LocalDateTime.now() + "   " + Thread.currentThread().getName() + "  到了!再等等!");
        c.await();
        System.out.println(LocalDateTime.now() + "   " + Thread.currentThread().getName() + "  執行了!");
    }

    @Override
    public void run() {
        try {
            dosth();
        } catch (BrokenBarrierException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

執行結果:

也可以使用CountDownLatch完成相同的功能:

package dgb.test.concurrent;

import java.time.LocalDateTime;
import java.util.concurrent.*;

/**
 * @author Dongguabai
 * @date 2018/9/30 11:45
 */
public class CyclicBarrierTest1 implements Runnable {

    public static final int COUNT = 5;

   // private static CyclicBarrier c = new CyclicBarrier(COUNT);
    private static CountDownLatch c = new CountDownLatch(COUNT);

    private static CyclicBarrierTest1 cyc = new CyclicBarrierTest1();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(COUNT);
        for (int i = 0; i < COUNT; i++) {
            executor.execute(() -> {
                try {
                    dosth();
                } catch (BrokenBarrierException | InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }

    public static void dosth() throws BrokenBarrierException, InterruptedException {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(LocalDateTime.now() + "   " + Thread.currentThread().getName() + "  到了!再等等!");
       // c.await();
        c.countDown();
        c.await();
        System.out.println(LocalDateTime.now() + "   " + Thread.currentThread().getName() + "  執行了!");
    }

    @Override
    public void run() {
        try {
            dosth();
        } catch (BrokenBarrierException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

講上面示例的CyclicBarrier構造修改一下:

執行結果:

CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為複雜的業務場景。例如,如果計算髮生錯誤,可以重置計數器,並讓執行緒重新執行一次。 CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的執行緒數量。isBroken()方法用來了解阻塞的執行緒是否被中斷。

package dgb.test.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest3 {
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        Thread thread = new Thread(()->{
            try {
                c.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        thread.interrupt();
        try {
            c.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            System.out.println(c.isBroken());
            e.printStackTrace();
        }
    }
}

輸出結果:

/**
 * Exception thrown when a thread tries to wait upon a barrier that is
 * in a broken state, or which enters the broken state while the thread
 * is waiting.
 *
 * @see CyclicBarrier
 *
 * @since 1.5
 * @author Doug Lea
 */
public class BrokenBarrierException extends Exception {