1. 程式人生 > 其它 >你對CyclicBarrier(迴圈柵欄)的瞭解有多少

你對CyclicBarrier(迴圈柵欄)的瞭解有多少

技術標籤:Javajava併發程式設計mysql

目錄:

1 CyclicBarrier 的應用場景
2 CyclicBarrier 的使用示例
3 CyclicBarrier 原始碼分析
4 CyclicBarrier 和 CountDownLatch 的區別

前言:

CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現執行緒間的技術等待,但是它的功能比CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。

CountDownLatch的實現是基於AQS的,而CycliBarrier是基於 ReentrantLock(ReentrantLock也屬於AQS同步器)和 Condition 的.

CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier 預設的構造方法是 CyclicBarrier(int parties) ,其引數表示屏障攔截的執行緒數量,每個執行緒呼叫 await 方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。

再來看一下它的建構函式:

public CyclicBarrier(int parties) {
  this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}

其中,parties 就代表了有攔截的執行緒的數量,當攔截的執行緒數量達到這個值的時候就開啟柵欄,讓所有執行緒通過。

內容:

1 CyclicBarrier 的應用場景

CyclicBarrier 可以用於多執行緒計算資料,最後合併計算結果的應用場景。比如我們用一個 Excel 儲存了使用者所有銀行流水,每個 Sheet 儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個 sheet 裡的銀行流水,都執行完之後,得到每個 sheet 的日均銀行流水,最後,再用 barrierAction 用這些執行緒的計算結果,計算出整個 Excel 的日均銀行流水。

2 CyclicBarrier 的使用示例

示例 1:

/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 測試 CyclicBarrier 類中帶引數的 await() 方法
*/
public class CyclicBarrierExample2 {
 // 請求的數量
 private static final int threadCount = 550;
 // 需要同步的執行緒數量
 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
 public static void main(String[] args) throws InterruptedException {
  // 建立執行緒池
  ExecutorService threadPool = Executors.newFixedThreadPool(10);
  for (int i = 0; i < threadCount; i++) {
   final int threadNum = i;
   Thread.sleep(1000);
   threadPool.execute(() -> {
    try {
     test(threadNum);
   } catch (InterruptedException e) {
     // TODO Auto-generated catch block
 e.printStackTrace();
   } catch (BrokenBarrierException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
   }
  });
 }
  threadPool.shutdown();
}
 public static void test(int threadnum) throws InterruptedException,
BrokenBarrierException {
  System.out.println("threadnum:" + threadnum + "is ready");
  try {
   /**等待60秒,保證子執行緒完全執行結束*/
   cyclicBarrier.await(60, TimeUnit.SECONDS);
 } catch (Exception e) {
   System.out.println("-----CyclicBarrierException------");
 }
  System.out.println("threadnum:" + threadnum + "is finish");
}
}

執行結果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish

......

可以看到當執行緒數量也就是請求數量達到我們定義的 5 個的時候, await 方法之後的方法才被執行。另外,CyclicBarrier 還提供一個更高階的建構函式 CyclicBarrier(int parties, RunnablebarrierAction) ,用於線上程到達屏障時,優先執行 barrierAction ,方便處理更復雜的業務場景。示例程式碼如下:

/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的時候指定一個 Runnable
*/
public class CyclicBarrierExample3 {
 // 請求的數量
 private static final int threadCount = 550;
 // 需要同步的執行緒數量
 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () ->
{
  System.out.println("------當執行緒數達到之後,優先執行------");
});
 public static void main(String[] args) throws InterruptedException {
  // 建立執行緒池
  ExecutorService threadPool = Executors.newFixedThreadPool(10);
  for (int i = 0; i < threadCount; i++) {
   final int threadNum = i;
   Thread.sleep(1000);
   threadPool.execute(() -> {
    try {
     test(threadNum);
   } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
   } catch (BrokenBarrierException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
   }
  });
 }
  threadPool.shutdown();
}
 public static void test(int threadnum) throws InterruptedException,
BrokenBarrierException {
  System.out.println("threadnum:" + threadnum + "is ready");
  cyclicBarrier.await();
  System.out.println("threadnum:" + threadnum + "is finish");
}
}

執行結果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------當執行緒數達到之後,優先執行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------當執行緒數達到之後,優先執行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......

3 CyclicBarrier 原始碼分析

當呼叫 CyclicBarrier 物件呼叫 await() 方法時,實際上呼叫的是 dowait(false, 0L) 方法。await() 方法就像樹立起一個柵欄的行為一樣,將執行緒擋住了,當攔住的執行緒數量達到 parties 的值時,柵欄才會開啟,執行緒才得以通過執行。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
      return dowait(false, 0L);
   } catch (TimeoutException toe) {
      throw new Error(toe); // cannot happen
   }
 }

dowait(false, 0L) :
// 當執行緒數量或者請求數量達到 count 時 await 之後的方法才會被執行。上面的示例中 count
的值就為 5。

private int count;
  /**
  * Main barrier code, covering the various policies.
  */
  private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
       TimeoutException {
    final ReentrantLock lock = this.lock;
    // 鎖住
    lock.lock();
    try {
      final Generation g = generation;
      if (g.broken)
        throw new BrokenBarrierException();
      // 如果執行緒中斷了,丟擲異常
      if (Thread.interrupted()) {
        breakBarrier();
        throw new InterruptedException();
     }
      // cout減1
      int index = --count;
      // 當 count 數量減為 0 之後說明最後一個執行緒已經到達柵欄了,也就是達到了可以執行
await 方法之後的條件
 if (index == 0) {  // tripped
        boolean ranAction = false;
        try {
          final Runnable command = barrierCommand;
          if (command != null)
            command.run();
          ranAction = true;
          // 將 count 重置為 parties 屬性的初始化值
          // 喚醒之前等待的執行緒
          // 下一波執行開始
          nextGeneration();
          return 0;
       } finally {
          if (!ranAction)
            breakBarrier();
       }
     }
     // loop until tripped, broken, interrupted, or timed out
      for (;;) {
        try {
          if (!timed)
            trip.await();
          else if (nanos > 0L)
            nanos = trip.awaitNanos(nanos);
       } catch (InterruptedException ie) {
          if (g == generation && ! g.broken) {
            breakBarrier();
            throw ie;
         } else {
            // We're about to finish waiting even if we had not
            // been interrupted, so this interrupt is deemed to
            // "belong" to subsequent execution.
            Thread.currentThread().interrupt();
         }
       }
        if (g.broken)
          throw new BrokenBarrierException();
        if (g != generation)
          return index;
        if (timed && nanos <= 0L) {
          breakBarrier();
          throw new TimeoutException();
       }
     }
   } finally {
      lock.unlock();
   }
 }

總結: CyclicBarrier 內部通過一個 count 變數作為計數器,cout 的初始值為 parties 屬性的初始化值,每當一個執行緒到了柵欄這裡了,那麼就將計數器減一。如果 count 值為 0 了,表示這是這一代最後一個執行緒到達柵欄,就嘗試執行我們構造方法中輸入的任務。

4 CyclicBarrier 和 CountDownLatch 的區別

下面這個是國外一個大佬的回答:

CountDownLatch 是計數器,只能使用一次,而 CyclicBarrier 的計數器提供 reset 功能,可以多次使用。但是我不那麼認為它們之間的區別僅僅就是這麼簡單的一點。我們來從 jdk 作者設計的目的來看,javadoc 是這麼描述它們的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a setof operations being performed in other threads completes.(CountDownLatch: 一個或者多個執行緒,等待其他多個執行緒完成某件事情之後才能執行;)

CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other toreach a common barrier point.(CyclicBarrier : 多個執行緒互相等待,直到到達同一個同步點,再繼續一起執行。)

對於 CountDownLatch 來說,重點是“一個執行緒(多個執行緒)等待”,而其他的 N 個執行緒在完成“某件事情”之後,可以終止,也可以等待。而對於 CyclicBarrier,重點是多個執行緒,在任意一個執行緒沒有完成,所有的執行緒都必須等待。

CountDownLatch 是計數器,執行緒完成一個記錄一個,只不過計數不是遞增而是遞減,而CyclicBarrier 更像是一個閥門,需要所有執行緒都到達,閥門才能開啟,然後繼續執行。

以上就是本期更新內容,關注下期更精彩!