CyclicBarrier(同步屏障)的簡單使用
CyclicBarrer簡介
CyclicBarrer,可迴圈使用的屏障,功能是讓多個執行緒到達某個點時被阻塞,直到最後一個執行緒達到這個屏障便釋放所有執行緒,和CountDownLatch的區別即在於執行緒釋放後屏障是否可重用。
例項化:通過帶引數的new CyclicBarrer(N)可例項化CyclicBarrier,N代表需要屏障攔截(阻塞)的執行緒數,也可以使用new CyclicBarrier(N,Runnable)的方式指定當所有阻塞的執行緒都到達屏障點後優先執行的任務barrierAction。
public CyclicBarrier(int parties) {……} public CyclicBarrier(int parties, Runnable barrierAction) {……}
阻塞執行緒:通過呼叫await方法告訴當前執行緒已到達屏障,進入阻塞等待狀態。也可以指定阻塞時間await(timeout,unit),防止阻塞時間過長,當阻塞超過指定時間,丟擲TimeoutException
public int await() throws InterruptedException, BrokenBarrierException {……}
public int await(long timeout, TimeUnit unit) {……}
測試demo:以 三個執行緒計算任務為例,其中一個執行緒計算時間很長,於是呼叫await(time,unit)來指定等待時間
public class CyclicBarrierService { static ExecutorService executorService = new ThreadPoolExecutor(3, 3, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(15)); static CyclicBarrier cyclicBarrier = new CyclicBarrier(3); public static void statistic() throws Exception { Future<Integer> task1 = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("我是任務一"); cyclicBarrier.await(1, TimeUnit.SECONDS); return 1; } }); Future<Integer> task2 = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("我是任務二"); cyclicBarrier.await(1, TimeUnit.SECONDS); return 2; } }); Future<Integer> task3 = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("我是任務三"); Thread.sleep(5000); //模擬任務3要執行很長時間 cyclicBarrier.await(1, TimeUnit.SECONDS); return 3; } }); int result1 = task1.get(); int result2 = task2.get(); int result3 = task3.get(); System.out.println("多執行緒計算結果為"); System.out.println(result1 + result2 + result3); executorService.shutdown(); } public static void main(String[] args) throws Exception { statistic(); } }
因為我在獲取執行緒計算結果時候未使用FutureTask.isDone()來判斷當前任務是否計算完成(直接呼叫FutureTask.get()可能會阻塞,加了isDone判斷,由於子執行緒任務還被阻塞在屏障點,所以獲取不到計算結果),上述程式碼就丟擲超時異常
我是任務一
我是任務二
我是任務三
Exception in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at com.pptv.activityapi.controller.actmodule.CyclicBarrierService.statistic(CyclicBarrierService.java:49)
at com.pptv.activityapi.controller.actmodule.CyclicBarrierService.main(CyclicBarrierService.java:62)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$1.call(CyclicBarrierService.java:25)
at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$1.call(CyclicBarrierService.java:21)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
重置計數器N:reset方法將屏障重置為其初始狀態。 如果任何一方當前正在屏障等待,他們將返回BrokenBarrierException。
測試demo:
public class CyclicBarrierService {
static ExecutorService executorService = new ThreadPoolExecutor(3, 3, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(15));
static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
public static void statistic() {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
log.info("我是任務一……");
} catch (Exception e) {
e.printStackTrace();
}
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
log.info("我是任務二……");
} catch (Exception e) {
e.printStackTrace();
}
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.reset();
log.info("我是任務三……");
} catch (Exception e) {
e.printStackTrace();
}
}
});
executorService.shutdown();
}
public static void main(String[] args) {
statistic();
}
}
第三個執行緒呼叫CyclicBarrier.reset()方法後將導致前面兩個阻塞在屏障點的執行緒顯式的丟擲java.util.concurrent.BrokenBarrierException,輸出結果如下
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:355)
at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$1.run(CylicBarrierService.java:25)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:355)
at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$2.run(CylicBarrierService.java:37)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2018-11-14 15:11:39,535[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務三……
CyclicBarrier使用場景及demo
結合上面說的,CyclicBarrier非常適合多執行緒計算任務,功能還是和CountDownLatch一致的,分組執行任務,最後彙總結果
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.*;
@Slf4j
@Service
public class CyclicBarrierService {
static ExecutorService executorService = new ThreadPoolExecutor(3, 3, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(15));
static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
public static void statistic() throws Exception {
Future<Integer> task1 = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("我是任務一");
cyclicBarrier.await(1, TimeUnit.SECONDS);
return 1;
}
});
Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("我是任務二");
cyclicBarrier.await(1, TimeUnit.SECONDS);
return 2;
}
});
Future<Integer> task3 = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("我是任務三");
cyclicBarrier.await(1, TimeUnit.SECONDS);
return 3;
}
});
int result1 = task1.get();
int result2 = task2.get();
int result3 = task3.get();
log.info("多執行緒計算結果為");
log.info(String.valueOf(result1 + result2 + result3));
executorService.shutdown();
}
public static void main(String[] args) throws Exception {
statistic();
}
}
輸出結果為
2018-11-14 16:39:24,127[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務一
2018-11-14 16:39:24,127[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務二
2018-11-14 16:39:24,127[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務三
2018-11-14 16:39:24,128[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]多執行緒計算結果為
2018-11-14 16:39:24,128[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]6
引申閱讀: