java併發之CyclicBarrier(障礙器)
CyclicBarrier是一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作
很有用。
//設定parties、count及barrierCommand屬性。 CyclicBarrier(int): //當await的數量到達了設定的數量後,首先執行該Runnable物件。 CyclicBarrier(int,Runnable): //通知barrier已完成執行緒 await():
適用情況:你希望建立一組任務,它們併發地執行工作,另外的一個任務在這一組任務併發執行結束前一直阻塞等待,直到該組任務全部執行結束,這個任務才得以執行。這非常像CountDownLatch,只是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。
下面給出一個簡單的例項來說明其用法:
結果:import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest { public static void main(String[] args) { //建立CyclicBarrier物件, //並設定執行完一組5個執行緒的併發任務後,再執行MainTask任務 CyclicBarrier cb = new CyclicBarrier(5, new MainTask()); new SubTask("A", cb).start(); new SubTask("B", cb).start(); new SubTask("C", cb).start(); new SubTask("D", cb).start(); new SubTask("E", cb).start(); } } /** * 最後執行的任務 */ class MainTask implements Runnable { public void run() { System.out.println("......執行最後的任務了......"); } } /** * 一組併發任務 */ class SubTask extends Thread { private String name; private CyclicBarrier cb; SubTask(String name, CyclicBarrier cb) { this.name = name; this.cb = cb; } public void run() { System.out.println("[併發任務" + name + "] 開始執行"); for (int i = 0; i < 999999; i++) ; //模擬耗時的任務 System.out.println("[併發任務" + name + "] 開始執行完畢,通知障礙器"); try { //每執行完一項任務就通知障礙器 cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
[併發任務A] 開始執行
[併發任務B] 開始執行
[併發任務B] 開始執行完畢,通知障礙器
[併發任務C] 開始執行
[併發任務C] 開始執行完畢,通知障礙器
[併發任務A] 開始執行完畢,通知障礙器
[併發任務D] 開始執行
[併發任務D] 開始執行完畢,通知障礙器
[併發任務E] 開始執行
[併發任務E] 開始執行完畢,通知障礙器
......終於要執行最後的任務了......
結果:package com.gpl.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest2 { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); // 三個執行緒同時到達 for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("執行緒" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + (cb.getNumberWaiting() + 1) + "個已到達" + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊" : "正在等候")); try { cb.await(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } Thread.sleep((long) (Math.random() * 10000)); System.out.println("執行緒" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + (cb.getNumberWaiting() + 1) + "個已到達" + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊" : "正在等候")); try { cb.await(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } Thread.sleep((long) (Math.random() * 10000)); System.out.println("執行緒" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已到達" + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊" : "正在等候")); try { cb.await(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
執行緒pool-1-thread-3即將到達集合地點1,當前已有1個已到達正在等候
執行緒pool-1-thread-2即將到達集合地點1,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點1,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-2即將到達集合地點2,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點2,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點2,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-1即將到達集合地點3,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點3,當前已有2個已到達正在等候
執行緒pool-1-thread-2即將到達集合地點3,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-2即將到達集合地點1,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點1,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-2即將到達集合地點2,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點2,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點2,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-1即將到達集合地點3,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點3,當前已有2個已到達正在等候
執行緒pool-1-thread-2即將到達集合地點3,當前已有3個已到達都到齊了,繼續走啊
等價的一個程式:
package com.gpl.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest3 {
public static void main(String[] args){
CyclicBarrier cb = new CyclicBarrier(3); // 三個執行緒同時到達
new SubTask1("A", cb).start();
new SubTask1("B", cb).start();
new SubTask1("C", cb).start();
}
}
class SubTask1 extends Thread {
private String name;
private CyclicBarrier cb;
SubTask1(String name, CyclicBarrier cb) {
this.name = name;
this.cb = cb;
}
public void run() {
//System.out.println("[併發任務" + name + "] 開始執行");
//for (int i = 0; i < 999999; i++) ; //模擬耗時的任務
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("執行緒"+ Thread.currentThread().getName()+ "即將到達集合地點1,當前已有"
+ (cb.getNumberWaiting() + 1)+ "個已到達"+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊": "正在等候"));
try {
//每執行完一項任務就通知障礙器
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}Thread.sleep((long) (Math.random() * 10000));
System.out.println("執行緒"
+ Thread.currentThread().getName()
+ "即將到達集合地點2,當前已有"
+ (cb.getNumberWaiting() + 1)
+ "個已到達"
+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
: "正在等候"));
try {
cb.await();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Thread.sleep((long) (Math.random() * 10000));
System.out.println("執行緒"
+ Thread.currentThread().getName()
+ "即將到達集合地點3,當前已有"
+ (cb.getNumberWaiting() + 1)
+ "個已到達"
+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
: "正在等候"));
try {
cb.await();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
結果:
執行緒Thread-0即將到達集合地點1,當前已有1個已到達正在等候
執行緒Thread-1即將到達集合地點1,當前已有2個已到達正在等候
執行緒Thread-2即將到達集合地點1,當前已有3個已到達都到齊了,繼續走啊
執行緒Thread-1即將到達集合地點2,當前已有1個已到達正在等候
執行緒Thread-0即將到達集合地點2,當前已有2個已到達正在等候
執行緒Thread-2即將到達集合地點2,當前已有3個已到達都到齊了,繼續走啊
執行緒Thread-0即將到達集合地點3,當前已有1個已到達正在等候
執行緒Thread-1即將到達集合地點3,當前已有2個已到達正在等候
執行緒Thread-2即將到達集合地點3,當前已有3個已到達都到齊了,繼續走啊