1. 程式人生 > >死磕 java同步系列之CyclicBarrier原始碼解析——有圖有真相

死磕 java同步系列之CyclicBarrier原始碼解析——有圖有真相

問題

(1)CyclicBarrier是什麼?

(2)CyclicBarrier具有什麼特性?

(3)CyclicBarrier與CountDownLatch的對比?

簡介

CyclicBarrier,迴環柵欄,它會阻塞一組執行緒直到這些執行緒同時達到某個條件才繼續執行。它與CountDownLatch很類似,但又不同,CountDownLatch需要呼叫countDown()方法觸發事件,而CyclicBarrier不需要,它就像一個柵欄一樣,當一組執行緒都到達了柵欄處才繼續往下走。

使用方法

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                System.out.println("before");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after");
            }).start();
        }
    }
}    

這段方法很簡單,使用一個CyclicBarrier使得三個執行緒保持同步,當三個執行緒同時到達cyclicBarrier.await();處大家再一起往下執行。

原始碼分析

主要內部類

private static class Generation {
    boolean broken = false;
}

Generation,中文翻譯為代,一代人的代,用於控制CyclicBarrier的迴圈使用。

比如,上面示例中的三個執行緒完成後進入下一代,繼續等待三個執行緒達到柵欄處再一起執行,而CountDownLatch則做不到這一點,CountDownLatch是一次性的,無法重置其次數。

主要屬性

// 重入鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱為trip,絆倒的意思,可能是指執行緒來了先絆倒,等達到一定數量了再喚醒
private final Condition trip = lock.newCondition();
// 需要等待的執行緒數量
private final int parties;
// 當喚醒的時候執行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 當前這一代還需要等待的執行緒數
private int count;

通過屬性可以看到,CyclicBarrier內部是通過重入鎖的條件鎖來實現的,那麼你可以腦補一下這個場景嗎?

彤哥來腦補一下:假如初始時count = parties = 3,當第一個執行緒到達柵欄處,count減1,然後把它加入到Condition的佇列中,第二個執行緒到達柵欄處也是如此,第三個執行緒到達柵欄處,count減為0,呼叫Condition的signalAll()通知另外兩個執行緒,然後把它們加入到AQS的佇列中,等待當前執行緒執行完畢,呼叫lock.unlock()的時候依次從AQS的佇列中喚醒一個執行緒繼續執行,也就是說實際上三個執行緒先依次(排隊)到達柵欄處,再依次往下執行。

以上純屬彤哥腦補的內容,真實情況是不是如此呢,且往後看。

構造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
    // 初始化count等於parties
    this.count = parties;
    // 初始化都到達柵欄處執行的命令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

構造方法需要傳入一個parties變數,也就是需要等待的執行緒數。

await()方法

每個需要在柵欄處等待的執行緒都需要顯式地呼叫await()方法等待其它執行緒的到來。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 呼叫dowait方法,不需要超時
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 當前代
        final Generation g = generation;
        
        // 檢查
        if (g.broken)
            throw new BrokenBarrierException();

        // 中斷檢查
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        
        // count的值減1
        int index = --count;
        // 如果數量減到0了,走這段邏輯(最後一個執行緒走這裡)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果初始化的時候傳了命令,這裡執行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 呼叫下一代方法
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 這個迴圈只有非最後一個執行緒可以走到
        for (;;) {
            try {
                if (!timed)
                    // 呼叫condition的await()方法
                    trip.await();
                else if (nanos > 0L)
                    // 超時等待方法
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            
            // 檢查
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常來說這裡肯定不相等
            // 因為上面打破柵欄的時候呼叫nextGeneration()方法時generation的引用已經變化了
            if (g != generation)
                return index;
            
            // 超時檢查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
private void nextGeneration() {
    // 呼叫condition的signalAll()將其佇列中的等待者全部轉移到AQS的佇列中
    trip.signalAll();
    // 重置count
    count = parties;
    // 進入下一代
    generation = new Generation();
}

dowait()方法裡的整個邏輯分成兩部分:

(1)最後一個執行緒走上面的邏輯,當count減為0的時候,打破柵欄,它呼叫nextGeneration()方法通知條件佇列中的等待執行緒轉移到AQS的佇列中等待被喚醒,並進入下一代。

(2)非最後一個執行緒走下面的for迴圈邏輯,這些執行緒會阻塞在condition的await()方法處,它們會加入到條件佇列中,等待被通知,當它們喚醒的時候已經更新換“代”了,這時候返回。

圖解

CyclicBarrier

學習過前面的章節,看這個圖很簡單了,看不懂的同學還需要把推薦的內容好好看看哦^^

總結

(1)CyclicBarrier會使一組執行緒阻塞在await()處,當最後一個執行緒到達時喚醒(只是從條件佇列轉移到AQS佇列中)前面的執行緒大家再繼續往下走;

(2)CyclicBarrier不是直接使用AQS實現的一個同步器;

(3)CyclicBarrier基於ReentrantLock及其Condition實現整個同步邏輯;

彩蛋

CyclicBarrier與CountDownLatch的異同?

(1)兩者都能實現阻塞一組執行緒等待被喚醒;

(2)前者是最後一個執行緒到達時自動喚醒;

(3)後者是通過顯式地呼叫countDown()實現的;

(4)前者是通過重入鎖及其條件鎖實現的,後者是直接基於AQS實現的;

(5)前者具有“代”的概念,可以重複使用,後者只能使用一次;

(6)前者只能實現多個執行緒到達柵欄處一起執行;

(7)後者不僅可以實現多個執行緒等待一個執行緒條件成立,還能實現一個執行緒等待多個執行緒條件成立(詳見CountDownLatch那章使用案例);

推薦閱讀

1、死磕 java同步系列之開篇

2、死磕 java魔法類之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己動手寫一個鎖Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

9、死磕 java同步系列之ReentrantLock原始碼解析(二)——條件鎖

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock原始碼解析

12、死磕 java同步系列之Semaphore原始碼解析

13、死磕 java同步系列之CountDownLatch原始碼解析

14、死磕 java同步系列之AQS終篇

15、死磕 java同步系列之StampedLock原始碼解析


歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

相關推薦

java同步系列CyclicBarrier原始碼解析——真相

問題 (1)CyclicBarrier是什麼? (2)CyclicBarrier具有什麼特性? (3)CyclicBarrier與

java同步系列ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

問題 (1)重入鎖是什麼? (2)ReentrantLock如何實現重入鎖? (3)ReentrantLock為什麼預設是非公平模式? (4)ReentrantLock除了可重入還有哪些特性? 簡介 Reentrant = Re + entrant,Re是重複、又、再的意思,entrant是enter的名詞或

java同步系列ReentrantLock原始碼解析(二)——條件鎖

問題 (1)條件鎖是什麼? (2)條件鎖適用於什麼場景? (3)條件鎖的await()是在其它執行緒signal()的時候喚醒的嗎? 簡介 條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。 比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素

java同步系列ReentrantReadWriteLock原始碼解析

問題 (1)讀寫鎖是什麼? (2)讀寫鎖具有哪些特性? (3)ReentrantReadWriteLock是怎麼實現讀寫鎖的? (4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap? 簡介 讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個執行緒可以同時對共享

java同步系列Semaphore原始碼解析

問題 (1)Semaphore是什麼? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什麼場景中? (

java同步系列StampedLock原始碼解析

問題 (1)StampedLock是什麼? (2)StampedLock具有什麼特性? (3)StampedLock是否支援可重入

java同步系列Phaser原始碼解析

問題 (1)Phaser是什麼? (2)Phaser具有哪些特性? (3)Phaser相對於CyclicBarrier和Count

java同步系列開篇

討論 關註 使用 避免死鎖 更新數據 讀寫 上下文切換 monit 缺點 簡介 同步系列,這是彤哥想了好久的名字,本來是準備寫鎖相關的內容,但是java中的CountDownLatch、Semaphore、CyclicBarrier這些類又不屬於鎖,它們和鎖又有很多共同點,

java同步系列JMM(Java Memory Model)

簡介 Java記憶體模型是在硬體記憶體模型上的更高層的抽象,它遮蔽了各種硬體和作業系統訪問的差異性,保證了Java程式在各種平臺下對記憶體的訪問都能達到一致的效果。 硬體記憶體模型 在正式講解Java的記憶體模型之前,我們有必要先了解一下硬體層面的一些東西。 在現代計算機的硬體體系中,CPU的運算速度是非常快

java同步系列volatile解析

問題 (1)volatile是如何保證可見性的? (2)volatile是如何禁止重排序的? (3)volatile的實現原理? (4)volatile的缺陷? 簡介 volatile可以說是Java虛擬機器提供的最輕量級的同步機制了,但是它並不容易被正確地理解,以至於很多人不習慣使用它,遇到多執行緒問題一律

java同步系列synchronized解析

問題 (1)synchronized的特性? (2)synchronized的實現原理? (3)synchronized是否可重入? (4)synchronized是否是公平鎖? (5)synchronized的優化? (6)synchronized的五種使用方式? 簡介 synchronized關鍵字是Ja

java同步系列自己動手寫一個鎖Lock

問題 (1)自己動手寫一個鎖需要哪些知識? (2)自己動手寫一個鎖到底有多簡單? (3)自己能不能寫出來一個完美的鎖? 簡介 本篇文章的目標一是自己動手寫一個鎖,這個鎖的功能很簡單,能進行正常的加鎖、解鎖操作。 本篇文章的目標二是通過自己動手寫一個鎖,能更好地理解後面章節將要學習的AQS及各種同步器實現的原理

java同步系列AQS起篇

問題 (1)AQS是什麼? (2)AQS的定位? (3)AQS的實現原理? (4)基於AQS實現自己的鎖? 簡介 AQS的全稱是AbstractQueuedSynchronizer,它的定位是為Java中幾乎所有的鎖和同步器提供一個基礎框架。 AQS是基於FIFO的佇列實現的,並且內部維護了一個狀態變數sta

java同步系列ReentrantLock VS synchronized——結果可能跟你想的不一樣

問題 (1)ReentrantLock有哪些優點? (2)ReentrantLock有哪些缺點? (3)ReentrantLock

java同步系列AQS終篇(面試)

問題 (1)AQS的定位? (2)AQS的重要組成部分? (3)AQS運用的設計模式? (4)AQS的總體流程? 簡介 AQS的全稱是AbstractQueuedSynchronizer,它的定位是為Java中幾乎所有的鎖和同步器提供一個基礎框架。 在之前的章節中,我們一起學習了ReentrantLock、R

java同步系列mysql分散式鎖

問題 (1)什麼是分散式鎖? (2)為什麼需要分散式鎖? (3)mysql如何實現分散式鎖? (4)mysql分散式鎖的優點和缺點? 簡介 隨著併發量的不斷增加,單機的服務遲早要向多節點或者微服務進化,這時候原來單機模式下使用的synchronized或者ReentrantLock將不再適用,我們迫切地需要一

java同步系列zookeeper分散式鎖

(2)zookeeper分散式鎖有哪些優點? (3)zookeeper分散式鎖有哪些缺點? 簡介 zooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,它可以為分散式應用提供一致性服務,它是Hadoop和Hbase的重要元件,同時也可以作為配置中心、註冊中心運用在微服務體系中。 本章我們將介

java同步系列redis分散式鎖進化史

(2)redis分散式鎖有哪些優點? (3)redis分散式鎖有哪些缺點? (4)redis實現分散式鎖有沒有現成的輪子可以使用? 簡介 Redis(全稱:Remote Dictionary Server 遠端字典服務)是一個開源的使用ANSI C語言編寫、支援網路、可基於記憶體亦可持久化的日誌型、Key-

java同步系列終結篇

腦圖 下面是關於同步系列的一份腦圖,列舉了主要的知識點和問題點,看過本系列文章的同學可以根據腦圖自行回顧所學的內容,也可以作為面試前的準備。 如果有需要高清無碼原圖的同學,可以關注公眾號“彤哥讀原始碼”,回覆“sync”領取。 總結 所謂同步,就是保證多執行緒(包括多程序)對共享資源的讀寫能夠安全有效的執