Java 多執行緒 執行緒互相等待 CyclicBarrier
阿新 • • 發佈:2019-01-09
先介紹一下JDK內容:
然後是Java程式設計思想中的介紹:java.util.concurrent 類 CyclicBarrier java.lang.Object 繼承者 java.util.concurrent.CyclicBarrier -------------------------------------------------------------------------------- public class CyclicBarrierextends Object一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。 CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作 很有用。 示例用法:下面是一個在並行分解設計中使用 barrier 的例子: class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; barrier = new CyclicBarrier(N, new Runnable() { public void run() { mergeRows(...); } }); for (int i = 0; i < N; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } } 在這個例子中,每個 worker 執行緒處理矩陣的一行,在處理完所有的行之前,該執行緒將一直在屏障處等待。處理完所有的行之後,將執行所提供的 Runnable 屏障操作,併合並這些行。如果合併者確定已經找到了一個解決方案,那麼 done() 將返回 true,所有的 worker 執行緒都將終止。 如果屏障操作在執行時不依賴於正掛起的執行緒,則執行緒組中的任何執行緒在獲得釋放時都能執行該操作。為方便此操作,每次呼叫 await() 都將返回能到達屏障處的執行緒的索引。然後,您可以選擇哪個執行緒應該執行屏障操作,例如: if (barrier.await() == 0) { // log the completion of this iteration }對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致執行緒過早地離開了屏障點,那麼在該屏障點等待的其他所有執行緒也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。 記憶體一致性效果:執行緒中呼叫 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,後者依次 happen-before 緊跟在從另一個執行緒中對應 await() 成功返回的操作。 從以下版本開始: 1.5 另請參見: CountDownLatch
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * CyclicBarrier * * CyclicBarrier適用於這樣的情況:你希望建立一組任務,它們並行地執行工作,然後在進行下一個步驟之前等待, * 直到所有任務都完成(看起來有些像join())。它使得所有的並行任務都將在柵欄處列隊,因此可以一直地向前移動。 * 這非常像CountDownLatch,只是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。 * * 對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式: * 如果因為中斷、失敗或者超時等原因,導致執行緒過早地離開了屏障點,那麼在該屏障點等待的其他所有執行緒也將通過 BrokenBarrierException * (如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。 * 記憶體一致性效果:執行緒中呼叫 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作, * 後者依次 happen-before 緊跟在從另一個執行緒中對應 await() 成功返回的操作。 * * 下面是Hosrac的賽馬遊戲的面向物件的多執行緒版本,其中使用了CyclicBarrier * */ /** * 馬的物件,每匹馬有自己的編號,每次前進將產生一個隨機數的步數,前進一次後進行等待。 * * @create @author Henry @date 2017-1-3 * */ class Horse implements Runnable { private static int counter = 0; private final int id = counter++; private int strides = 0; private static Random rand = new Random(47); private static CyclicBarrier barrier; public Horse(CyclicBarrier barrier) { this.barrier = barrier; } public synchronized int getStrides() { return strides; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (this) { strides += rand.nextInt(3); if(getStrides()==14) throw new Exception("world"); } barrier.await(); } } catch (InterruptedException e) { System.err.println("InterruptedException"); } catch (BrokenBarrierException e) { System.err.println("BrokenBarrierException"); throw new RuntimeException(e); } catch (Exception e) { System.err.println("Exception"); throw new RuntimeException(e); } } @Override public String toString() { return "Horse " + id + " "; } /** * 輸出馬跑的步數。 * * @return */ public String tracks() { StringBuilder s = new StringBuilder(); for (int i = 0; i < getStrides(); i++) s.append("*"); s.append(id); return s.toString(); } } /** * 馬場跑道,定義長度為75。 * * @create @author Henry @date 2017-1-3 * */ public class HorseRace { static final int FINISH_LINE = 75; private List<Horse> horses = new ArrayList<Horse>(); private ExecutorService exec = Executors.newCachedThreadPool(); private CyclicBarrier barrier; public HorseRace(int nHorses, final int pause) { barrier = new CyclicBarrier(nHorses, new Runnable() { @Override public void run() { StringBuilder s = new StringBuilder(); for (int i = 0; i < FINISH_LINE; i++) s.append("="); System.out.println(s.toString()); for (Horse horse : horses) System.out.println(horse.tracks()); for (Horse horse : horses) if (horse.getStrides() >= FINISH_LINE) { System.out.println(horse + " won!"); exec.shutdownNow(); return; } try { TimeUnit.MILLISECONDS.sleep(pause); } catch (InterruptedException e) { System.err.println("barrier-action sleep interrupted"); } } }); for (int i = 0; i < nHorses; i++) { Horse horse = new Horse(barrier); horses.add(horse); exec.execute(horse); } } public static void main(String[] args) { int nHorses = 8; int pause = 200; new HorseRace(nHorses, pause); } }
可以向CyclicBarrier提供一個“柵欄動作”,它是一個Runnable,當計數值到達0時自動執行---這是CyclicBarrier和CountDownLatch之間的另一個區別。這裡,柵欄動作是作為匿名內部類建立的,它被提交給了CyclicBarrier的構造器。
我試圖讓每匹馬都列印自己,但是之後的顯式順序取決於工作管理員。CyclicBarrier使得每匹馬都要執行為了向前移動所必需執行的所有工作,然後必須在柵欄處等待其他所有的馬都準備完畢,當所有的馬都向前移動的時候,CyclicBarrier將自動呼叫Runnale柵欄動作任務,按順序顯式馬和終點線的位置。
一旦所有的任務都越過了柵欄,它就會自動地為下一回合比賽做好準備。
為了展示這個非常簡單的動畫效果,你需要將控制檯視窗的尺寸調整為小到只有馬時,才會展示出來。