詳解Java迴環屏障CyclicBarrier
阿新 • • 發佈:2020-09-14
上一篇說的CountDownLatch是一個計數器,類似執行緒的join方法,但是有一個缺陷,就是當計數器的值到達0之後,再呼叫CountDownLatch的await和countDown方法就會立刻返回,就沒有作用了,那麼反正是一個計數器,為什麼不能重複使用呢?於是就出現了這篇說的CyclicBarrier,它的狀態可以被重用;
一.簡單例子
用法其實和CountDownLatch差不多,也就是一個計數器,當計數器的值變為0之後,就會把阻塞的執行緒喚醒:
package com.example.demo.study; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Study0216 { // 注意這裡的構造器,第一個引數表示計數器初始值 // 第二個引數表示當計數器的值變為0的時候就觸發的任務 static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,() -> { System.out.println("cyclicBarrier task "); }); public static void main(String[] args) { // 新建兩個執行緒的執行緒池 ExecutorService pool = Executors.newFixedThreadPool(2); // 執行緒1放入執行緒池中 pool.submit(() -> { try { System.out.println("Thread1----await-begin"); cyclicBarrier.await(); System.out.println("Thread1----await-end"); } catch (Exception e) { e.printStackTrace(); } }); // 執行緒2放到執行緒池中 pool.submit(() -> { try { System.out.println("Thread2----await-begin"); cyclicBarrier.await(); System.out.println("Thread2----await-end"); } catch (Exception e) { e.printStackTrace(); } }); // 關閉執行緒池,此時還在執行的任務會繼續執行 pool.shutdown(); } }
我們再看看CyclicBarrier的複用性,這裡比如有一個任務,有三部分組成,分別是A,B,C,然後建立兩個執行緒去執行這個任務,必須要等到兩個執行緒都執行完成A部分,然後才能開始執行B,只有兩個執行緒都執行完成B部分,才能執行C:
package com.example.demo.study; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Study0216 { // 這裡的構造器,只有一個引數,表示計數器初始值 static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { // 新建兩個執行緒的執行緒池 ExecutorService pool = Executors.newFixedThreadPool(2); // 執行緒1放入執行緒池中 pool.submit(() -> { try { System.out.println("Thread1----stepA-start"); cyclicBarrier.await(); System.out.println("Thread1----stepB-start"); cyclicBarrier.await(); System.out.println("Thread1----stepC-start"); } catch (Exception e) { e.printStackTrace(); } }); // 執行緒2放到執行緒池中 pool.submit(() -> { try { System.out.println("Thread2----stepA-start"); cyclicBarrier.await(); System.out.println("Thread2----stepB-start"); cyclicBarrier.await(); System.out.println("Thread2----stepC-start"); } catch (Exception e) { e.printStackTrace(); } }); // 關閉執行緒池,此時還在執行的任務會繼續執行 pool.shutdown(); } }
二.基本原理
我們看看一些重要屬性:
public class CyclicBarrier { //這個內部類只有一個boolean值 private static class Generation { boolean broken = false; } //獨佔鎖 private final ReentrantLock lock = new ReentrantLock(); //條件變數 private final Condition trip = lock.newCondition(); //儲存執行緒的總數 private final int parties; //這是一個任務,通過構造器傳遞一個任務,當計數器變為0之後,就可以執行這個任務 private final Runnable barrierCommand; //這類內部之後一個boolean的值,表示屏障是否被打破 private Generation generation = new Generation(); //計數器 private int count; }
構造器:
//我們的構造器初始值設定的是parties public CyclicBarrier(int parties) { this(parties,null); } //注意,這裡開始的時候是count等於parties //為什麼要有兩個變數呢?我們每次呼叫await方法的時候count減一,當count的值變為0之後,怎麼又還原成初始值呢? //直接就把parties的值賦值給count就行了呀,簡單吧! public CyclicBarrier(int parties,Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
然後再看看await方法:
public int await() throws InterruptedException,BrokenBarrierException { try { //呼叫的是dowait方法 return dowait(false,0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //假設count等於3,有三個執行緒都在呼叫這個方法,預設超時時間為0,那麼首每次都只有一個執行緒可以獲取鎖,將count減一,不為0 //就會到下面的for迴圈中扔到條件佇列中掛起;直到第三個執行緒呼叫這個dowait方法,count減一等於0,那麼當前執行緒執行任務之後, //就會喚醒條件變數中阻塞的執行緒,並重置count為初始值3 private int dowait(boolean timed,long nanos)throws InterruptedException,BrokenBarrierException,TimeoutException { //獲取鎖 final ReentrantLock lock = this.lock; lock.lock(); try { //g中只有一個boolean值 final Generation g = generation; //如果g中的值為true的時候,拋錯 if (g.broken) throw new BrokenBarrierException(); //如果當前執行緒中斷,就拋錯 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //count減一,再賦值給index int index = --count; //如果index等於0的時候,說明所有的執行緒已經到屏障點了,就可以 if (index == 0) { // tripped boolean ranAction = false; try { //執行當前執行緒的任務 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //喚醒其他因為呼叫了await方法阻塞的執行緒 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //能到這裡來,說明是count不等於0,也就是還有的執行緒沒有到屏障點 for (;;) { try { //wait方法有兩種情況,一種是設定超時時間,一種是不設定超時時間 //這裡就是對超時時間進行的一個判斷,如果設定的超時時間為0,則會在條件佇列中無限的等待下去,直到被喚醒 //設定了超時時間,那就等待該時間 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } } //喚醒其他因為呼叫了await方法阻塞的執行緒 private void nextGeneration() { //喚醒條件變數中所有執行緒 trip.signalAll(); //重置count的值 count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; //重置count為初始值parties count = parties; //喚醒條件佇列中的所有執行緒 trip.signalAll(); }
以上就是詳解Java迴環屏障CyclicBarrier的詳細內容,更多關於Java CyclicBarrier的資料請關注我們其它相關文章!