你對CyclicBarrier(迴圈柵欄)的瞭解有多少
目錄:
1 CyclicBarrier 的應用場景
2 CyclicBarrier 的使用示例
3 CyclicBarrier 原始碼分析
4 CyclicBarrier 和 CountDownLatch 的區別
前言:
CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現執行緒間的技術等待,但是它的功能比CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。
CountDownLatch的實現是基於AQS的,而CycliBarrier是基於 ReentrantLock(ReentrantLock也屬於AQS同步器)和 Condition 的.
CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier 預設的構造方法是 CyclicBarrier(int parties) ,其引數表示屏障攔截的執行緒數量,每個執行緒呼叫 await 方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。
再來看一下它的建構函式:
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中,parties 就代表了有攔截的執行緒的數量,當攔截的執行緒數量達到這個值的時候就開啟柵欄,讓所有執行緒通過。
內容:
1 CyclicBarrier 的應用場景
CyclicBarrier 可以用於多執行緒計算資料,最後合併計算結果的應用場景。比如我們用一個 Excel 儲存了使用者所有銀行流水,每個 Sheet 儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個 sheet 裡的銀行流水,都執行完之後,得到每個 sheet 的日均銀行流水,最後,再用 barrierAction 用這些執行緒的計算結果,計算出整個 Excel 的日均銀行流水。
2 CyclicBarrier 的使用示例
示例 1:
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 測試 CyclicBarrier 類中帶引數的 await() 方法
*/
public class CyclicBarrierExample2 {
// 請求的數量
private static final int threadCount = 550;
// 需要同步的執行緒數量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 建立執行緒池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException,
BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保證子執行緒完全執行結束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
執行結果,如下:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
可以看到當執行緒數量也就是請求數量達到我們定義的 5 個的時候, await 方法之後的方法才被執行。另外,CyclicBarrier 還提供一個更高階的建構函式 CyclicBarrier(int parties, RunnablebarrierAction) ,用於線上程到達屏障時,優先執行 barrierAction ,方便處理更復雜的業務場景。示例程式碼如下:
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的時候指定一個 Runnable
*/
public class CyclicBarrierExample3 {
// 請求的數量
private static final int threadCount = 550;
// 需要同步的執行緒數量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () ->
{
System.out.println("------當執行緒數達到之後,優先執行------");
});
public static void main(String[] args) throws InterruptedException {
// 建立執行緒池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException,
BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}
}
執行結果,如下:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------當執行緒數達到之後,優先執行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------當執行緒數達到之後,優先執行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
3 CyclicBarrier 原始碼分析
當呼叫 CyclicBarrier 物件呼叫 await() 方法時,實際上呼叫的是 dowait(false, 0L) 方法。await() 方法就像樹立起一個柵欄的行為一樣,將執行緒擋住了,當攔住的執行緒數量達到 parties 的值時,柵欄才會開啟,執行緒才得以通過執行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait(false, 0L) :
// 當執行緒數量或者請求數量達到 count 時 await 之後的方法才會被執行。上面的示例中 count
的值就為 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 鎖住
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 如果執行緒中斷了,丟擲異常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// cout減1
int index = --count;
// 當 count 數量減為 0 之後說明最後一個執行緒已經到達柵欄了,也就是達到了可以執行
await 方法之後的條件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 將 count 重置為 parties 屬性的初始化值
// 喚醒之前等待的執行緒
// 下一波執行開始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
總結: CyclicBarrier 內部通過一個 count 變數作為計數器,cout 的初始值為 parties 屬性的初始化值,每當一個執行緒到了柵欄這裡了,那麼就將計數器減一。如果 count 值為 0 了,表示這是這一代最後一個執行緒到達柵欄,就嘗試執行我們構造方法中輸入的任務。
4 CyclicBarrier 和 CountDownLatch 的區別
下面這個是國外一個大佬的回答:
CountDownLatch 是計數器,只能使用一次,而 CyclicBarrier 的計數器提供 reset 功能,可以多次使用。但是我不那麼認為它們之間的區別僅僅就是這麼簡單的一點。我們來從 jdk 作者設計的目的來看,javadoc 是這麼描述它們的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a setof operations being performed in other threads completes.(CountDownLatch: 一個或者多個執行緒,等待其他多個執行緒完成某件事情之後才能執行;)
CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other toreach a common barrier point.(CyclicBarrier : 多個執行緒互相等待,直到到達同一個同步點,再繼續一起執行。)
對於 CountDownLatch 來說,重點是“一個執行緒(多個執行緒)等待”,而其他的 N 個執行緒在完成“某件事情”之後,可以終止,也可以等待。而對於 CyclicBarrier,重點是多個執行緒,在任意一個執行緒沒有完成,所有的執行緒都必須等待。
CountDownLatch 是計數器,執行緒完成一個記錄一個,只不過計數不是遞增而是遞減,而CyclicBarrier 更像是一個閥門,需要所有執行緒都到達,閥門才能開啟,然後繼續執行。
以上就是本期更新內容,關注下期更精彩!