JUC之AQS之CyclicBarrier
阿新 • • 發佈:2018-12-12
CyclicBarrier的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。
CyclicBarrier有兩個構造方法:
第一個構造方法只有int引數,這個引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。
第二個構造方法多了一個Runnable,用於線上程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。
先看下面這個例子:
package dgb.test.concurrent; import java.time.LocalDateTime; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author Dongguabai * @date 2018/9/30 11:45 */ public class CyclicBarrierTest1 implements Runnable { public static final int COUNT = 5; private static CyclicBarrier c = new CyclicBarrier(COUNT); private static CyclicBarrierTest1 cyc = new CyclicBarrierTest1(); public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(COUNT); for (int i = 0; i < COUNT; i++) { executor.execute(() -> { try { dosth(); } catch (BrokenBarrierException | InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); } public static void dosth() throws BrokenBarrierException, InterruptedException { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 到了!再等等!"); c.await(); System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 執行了!"); } @Override public void run() { try { dosth(); } catch (BrokenBarrierException | InterruptedException e) { e.printStackTrace(); } } }
執行結果:
也可以使用CountDownLatch完成相同的功能:
package dgb.test.concurrent; import java.time.LocalDateTime; import java.util.concurrent.*; /** * @author Dongguabai * @date 2018/9/30 11:45 */ public class CyclicBarrierTest1 implements Runnable { public static final int COUNT = 5; // private static CyclicBarrier c = new CyclicBarrier(COUNT); private static CountDownLatch c = new CountDownLatch(COUNT); private static CyclicBarrierTest1 cyc = new CyclicBarrierTest1(); public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(COUNT); for (int i = 0; i < COUNT; i++) { executor.execute(() -> { try { dosth(); } catch (BrokenBarrierException | InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); } public static void dosth() throws BrokenBarrierException, InterruptedException { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 到了!再等等!"); // c.await(); c.countDown(); c.await(); System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + " 執行了!"); } @Override public void run() { try { dosth(); } catch (BrokenBarrierException | InterruptedException e) { e.printStackTrace(); } } }
講上面示例的CyclicBarrier構造修改一下:
執行結果:
CyclicBarrier和CountDownLatch的區別
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為複雜的業務場景。例如,如果計算髮生錯誤,可以重置計數器,並讓執行緒重新執行一次。 CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的執行緒數量。isBroken()方法用來了解阻塞的執行緒是否被中斷。
package dgb.test.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest3 {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread thread = new Thread(()->{
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
thread.start();
thread.interrupt();
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println(c.isBroken());
e.printStackTrace();
}
}
}
輸出結果:
/**
* Exception thrown when a thread tries to wait upon a barrier that is
* in a broken state, or which enters the broken state while the thread
* is waiting.
*
* @see CyclicBarrier
*
* @since 1.5
* @author Doug Lea
*/
public class BrokenBarrierException extends Exception {