併發工具類(二)同步屏障CyclicBarrier
阿新 • • 發佈:2018-12-22
簡介
CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。
例項程式碼如下:
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } }
輸出
2 1
或者輸出
1 2
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)則主執行緒和子執行緒會永遠等待,因為沒有第三個執行緒執行await方法,即沒有第三個執行緒到達屏障,所以之前到達屏障的兩個執行緒都不會繼續執行。
CyclicBarrier還提供一個更高階的建構函式CyclicBarrier(int parties, Runnable barrierAction),用於線上程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。程式碼如下:
public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } static class A implements Runnable { @Override public void run() { System.out.println(3); } } }
輸出
3 1 2
CyclicBarrier的應用場景
CyclicBarrier可以用於多執行緒計算資料,最後合併計算結果的應用場景。比如我們用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些執行緒的計算結果,計算出整個Excel的日均銀行流水。
CyclicBarrier和CountDownLatch的區別
- CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為複雜的業務場景,比如如果計算髮生錯誤,可以重置計數器,並讓執行緒們重新執行一次。
- CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的執行緒數量。isBroken方法用來知道阻塞的執行緒是否被中斷。比如以下程式碼執行完之後會返回true。
isBroken的使用程式碼如下:
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest3 { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { Thread thread = new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } } }); thread.start(); thread.interrupt(); try { c.await(); } catch (Exception e) { System.out.println(c.isBroken()); } } }
輸出
true