1. 程式人生 > 其它 >秒懂:JCTool 的 Mpsc 超高效能無鎖佇列 (史上最全+10W字長文)

秒懂:JCTool 的 Mpsc 超高效能無鎖佇列 (史上最全+10W字長文)

文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 部落格園版 為您奉上珍貴的學習資源 :

免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高併發核心程式設計(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:尼恩Java面試寶典 V11

面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取

超高效能無鎖佇列 Mpsc Queue

注: 本文從 對 超高效能無鎖佇列, 做了一個 系統化、由淺入深的 穿透式介紹, 幫助大家徹底掌握 這個高效能的演算法。
另外
本文以 PDF 持續更新,最新尼恩 架構筆記、面試題 的PDF檔案,請從下面的連結獲取:語雀 或者 碼雲

在大規模流量的高併發系統中, 需要一些比連結串列結構效能更好的,基於陣列 + CAS 操作實現的無鎖安全佇列,在行業中被廣受認可的 王者佇列,是以下兩個高效能無鎖佇列:

  • Disruptor 環形佇列

  • JCTools MSPC 佇列

Disruptor 環形佇列已經在老的文章中,進行過非常詳細的介紹。接下來,從架構、原始碼的角度,介紹一下 JCTools MSPC 佇列。

這裡解釋一下 MSPC 的含義,如下:

  • M:Multiple,多個的。
  • S:Single,單個的。
  • P:Producer,生產者。
  • C:Consumer,消費者。

因此MpscQueue適用於多生產者,單消費者的高效能無鎖佇列。 多生產者單消費者的使用場景,最佳的案例有:

  • Netty Reactor 執行緒中任務佇列 taskQueue 必須滿足多個生產者可以同時提交任務。

  • Caffeine中多個生產者業務執行緒,去進行快取寫入操作, 而只有單個的維護執行緒,基於W-TinyLRU進行訪問記錄的維護

所以 JCTools 提供的 Mpsc Queue 非常適合:

  • Netty 非同步任務場景,
  • Caffeine的寫入操作訪問記錄維護場景.

在介紹 Mpsc Queue 之前,我們先回顧下 JDK 原生佇列的工作原理。

JDK 內建併發佇列

JDK 內建併發佇列按照實現方式可以分為阻塞佇列和非阻塞佇列兩種型別,阻塞佇列是基於鎖實現的,非阻塞佇列是基於 CAS 操作實現的。

JDK 中包含多種阻塞和非阻塞的佇列實現,如下圖所示。

佇列是一種 FIFO(先進先出)的資料結構,JDK 中定義了 java.util.Queue 的佇列介面,與 List、Set 介面類似,java.util.Queue 也繼承於 Collection 集合介面。

此外,JDK 還提供了一種雙端佇列介面 java.util.Deque,我們最常用的 LinkedList 就是實現了 Deque 介面。

下面我們簡單說說上圖中的每個佇列的特點,並給出一些對比和總結。

阻塞佇列

阻塞佇列在佇列為空或者佇列滿時,都會發生阻塞。阻塞佇列自身是執行緒安全的,使用者無需關心執行緒安全問題,降低了多執行緒開發難度。

阻塞佇列主要分為以下幾種:

  • ArrayBlockingQueue

    最基礎且開發中最常用的阻塞佇列,底層採用陣列實現的有界佇列,初始化需要指定佇列的容量。ArrayBlockingQueue 是如何保證執行緒安全的呢?

    它內部是使用了一個重入鎖 ReentrantLock,並搭配 notEmpty、notFull 兩個條件變數 Condition 來控制併發訪問。

    從佇列讀取資料時,如果佇列為空,那麼會阻塞等待,直到佇列有資料了才會被喚醒。

    如果佇列已經滿了,也同樣會進入阻塞狀態,直到佇列有空閒才會被喚醒。

  • LinkedBlockingQueue

    內部採用的資料結構是連結串列,佇列的長度可以是有界或者無界的,初始化不需要指定佇列長度,預設是 Integer.MAX_VALUE。

    LinkedBlockingQueue 內部使用了 takeLock、putLock兩個重入鎖 ReentrantLock,以及 notEmpty、notFull 兩個條件變數 Condition 來控制併發訪問。

    採用讀鎖和寫鎖的好處是可以避免讀寫時相互競爭鎖的現象,所以相比於 ArrayBlockingQueue,LinkedBlockingQueue 的效能要更好。

  • PriorityBlockingQueue

    採用最小堆實現的優先順序佇列,佇列中的元素按照優先順序進行排列,每次出隊都是返回優先順序最高的元素。PriorityBlockingQueue 內部是使用了一個 ReentrantLock 以及一個條件變數 Condition notEmpty 來控制併發訪問,

    因為 PriorityBlockingQueue 是無界佇列,所以不需要 notFull ,每次 put 都不會發生阻塞。PriorityBlockingQueue 底層的最小堆是採用陣列實現的,當元素個數大於等於最大容量時會觸發擴容,

    在擴容時會先釋放鎖,保證其他元素可以正常出隊,然後使用 CAS 操作確保只有一個執行緒可以執行擴容邏輯。

  • DelayQueue

    一種支援延遲獲取元素的阻塞佇列,常用於快取、定時任務排程等場景。

    DelayQueue 內部是採用優先順序佇列 PriorityQueue 儲存物件。

    DelayQueue 中的每個物件都必須實現 Delayed 介面,並重寫 compareTo 和 getDelay 方法。向佇列中存放元素的時候必須指定延遲時間,只有延遲時間已滿的元素才能從佇列中取出。

  • SynchronizedQueue

    又稱無緩衝佇列。

    比較特別的是 SynchronizedQueue 內部不會儲存元素。與 ArrayBlockingQueue、LinkedBlockingQueue 不同,SynchronizedQueue 直接使用 CAS 操作控制執行緒的安全訪問。

    其中 put 和 take 操作都是阻塞的,每一個 put 操作都必須阻塞等待一個 take 操作,反之亦然。

    所以 SynchronizedQueue 可以理解為生產者和消費者配對的場景,雙方必須互相等待,直至配對成功。

    在 JDK 的執行緒池 Executors.newCachedThreadPool 中就存在 SynchronousQueue 的運用,對於新提交的任務,如果有空閒執行緒,將重複利用空閒執行緒處理任務,否則將新建執行緒進行處理。

  • LinkedTransferQueue

    一種特殊的無界阻塞佇列,可以看作 LinkedBlockingQueues、SynchronousQueue(公平模式)、ConcurrentLinkedQueue 的合體。

    與 SynchronousQueue 不同的是,LinkedTransferQueue 內部可以儲存實際的資料,當執行 put 操作時,如果有等待執行緒,那麼直接將資料交給對方,否則放入佇列中。與 LinkedBlockingQueues 相比,LinkedTransferQueue 使用 CAS 無鎖操作進一步提升了效能。

非阻塞佇列

說完阻塞佇列,我們再來看下非阻塞佇列。

非阻塞佇列不需要通過加鎖的方式對執行緒阻塞,併發效能更好。

JDK 中常用的非阻塞佇列有以下幾種:

  • ConcurrentLinkedQueue

    它是一個採用雙向連結串列實現的無界併發非阻塞佇列,它屬於 LinkedQueue 的安全版本。ConcurrentLinkedQueue 內部採用 CAS 操作保證執行緒安全,這是非阻塞佇列實現的基礎,

    相比 ArrayBlockingQueue、LinkedBlockingQueue 具備較高的效能。

  • ConcurrentLinkedDeque

    也是一種採用雙向連結串列結構的無界併發非阻塞佇列。

    與 ConcurrentLinkedQueue 不同的是,ConcurrentLinkedDeque 屬於雙端佇列,

    它同時支援 FIFO 和 FILO 兩種模式,可以從佇列的頭部插入和刪除資料,也可以從佇列尾部插入和刪除資料,適用於多生產者和多消費者的場景。

BlockingQueue阻塞佇列超級介面

至此,常見的佇列型別我們已經介紹完了。我們在平時開發中使用頻率最高的是 BlockingQueue。

實現一個阻塞佇列需要具備哪些基本功能呢?

下面看 BlockingQueue 的介面繼承關係,如下圖所示。

下面看 BlockingQueue 的介面的各種實現子類,如下圖所示。

BlockingQueue 實現子類太多,圖裡放不下,還請大家通過IDEA工具,自行檢視。

下面看 BlockingQueue 的介面的 抽象方法,如下圖所示。

我們可以通過下面一張表格,對上述 BlockingQueue 介面的具體行為進行歸類。

以下表格,來自於《Java高併發核心程式設計 卷2 加強版》

Java內建佇列的問題

我們先來看一看常用的執行緒安全的內建佇列有什麼問題。Java的內建佇列如下表所示。

佇列 有界性 資料結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

佇列的底層一般分成三種:陣列、連結串列和堆。

其中,堆一般情況下是為了實現帶有優先順序特性的佇列,暫時不做介紹,後面結合堆的結構,做專題介紹,這個也是一個o(logn)的效能比較高的結構。

從陣列和連結串列兩種資料結構來看,兩類結構如下:

  • 基於陣列執行緒安全的佇列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證執行緒安全;
  • 基於連結串列的執行緒安全佇列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實現執行緒安全,而後者是無鎖結構,通過原子變數compare and swap(以下簡稱“CAS”)這種無鎖方式來實現的。

LinkedTransferQueue和ConcurrentLinkedQueue一樣,也是無鎖結構,通過原子變數compare and swap(以下簡稱“CAS”)這種不加鎖的方式來實現的,效能還是比較高的。

那麼LinkedTransferQueue和ConcurrentLinkedQueue 的問題是啥呢?

連結串列結構的核心的問題是:

連結的節點,在高併發的場景下,存在者大量的節點建立/GC回收的壓力,這會導致STW的產生,容易出現業務卡頓。

另外,LinkedTransferQueue 對和ConcurrentLinkedQueue 對 volatile型別的變數進行 CAS 操作,存在偽共享問題,

總之,在高併發的場景下,連結串列結構沒有陣列結構,效能那麼優越。

而JDK內建的基於資料結構的ArrayBlockingQueue佇列,有一個核心問題:

有鎖結構,高併發場景,效能沒有無鎖結構,那麼高

另外,最好是基於陣列結構的環形佇列模式, 能最大的限度的複用記憶體空間,減少記憶體分配和GC的壓力。

高效能的陣列 + CAS無鎖佇列解決訪問

在大規模流量的高併發系統中,需要一個數組 + CAS 操作實現的無鎖安全佇列,有沒有成熟的解決方案呢?

有兩個高效能無鎖佇列:

  • Disruptor

  • JCTools

Disruptor 採用的是環形結構, 進行無鎖佇列的設計

JCTools採用的是 陣列+連結串列的複合結構, 進行無鎖佇列的設計

Disruptor如何實現高效能?

使用Disruptor,主要用於對效能要求高、延遲低的場景,它通過“榨乾”機器的效能來換取處理的高效能。

Disruptor實現高效能主要體現了去掉了鎖,採用CAS演算法,同時內部通過環形佇列實現有界佇列。

  • 環形資料結構
    陣列元素不會被回收,避免頻繁的GC,所以,為了避免垃圾回收,採用陣列而非連結串列。

    同時,陣列對處理器的快取機制更加友好。

    預分配陣列空間

  • 無鎖設計

    採用CAS無鎖方式,保證執行緒的安全性

    每個生產者或者消費者執行緒,會先申請可以操作的元素在陣列中的位置,申請到之後,直接在該位置寫入或者讀取資料。

    整個過程通過原子變數CAS,保證操作的執行緒安全。

為啥cas效能高,請參見《java高併發核心程式設計卷2》

  • 元素位置屬性進行cacheline填充:

    通過新增額外的無用資訊,避免偽共享問題

    效能提升5倍多

  • 元素位置屬性進行volitale變數延遲寫入(有序寫入):

    效能提升10倍多

  • 位運算定位元素在陣列當中的位置

陣列長度2^n,通過位運算,加快定位的速度。

下標採取遞增的形式。

不用擔心index溢位的問題。index是long型別,即使100萬QPS的處理速度,也需要30萬年才能用完。

Disruptor 已經開源,詳細可查閱 Github 地址 https://github.com/LMAX-Exchange/disruptor。

JCTools如何實現高效能?

JCTools 也是一個開源專案,Github 地址為 https://github.com/JCTools/JCTools。

JCTools 是適用於 JVM 併發開發的工具,主要提供了一些 JDK 缺失的併發資料結構,例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞佇列可以分為四種類型,可以根據不同的場景選擇使用。

  • Spsc 單生產者單消費者;
  • Mpsc 多生產者單消費者;
  • Spmc 單生產者多消費者;
  • Mpmc 多生產者多消費者。

Netty 中直接引入了 JCTools 的 Mpsc Queue,Caffeine 中引入了 JCTools 的 Mpsc Queue,複製了其中的程式碼,然後簡單改了下。

相比於 JDK 原生的併發佇列,Mpsc Queue 又有什麼過人之處呢?

Mpsc Queue 基礎知識

Mpsc 的全稱是 Multi Producer Single Consumer,多生產者單消費者。

Mpsc Queue 可以保證多個生產者同時訪問佇列是執行緒安全的,而且同一時刻只允許一個消費者從佇列中讀取資料。

多生產者單消費者的使用場景,最佳的案例有:

  • Netty Reactor 執行緒中任務佇列 taskQueue 必須滿足多個生產者可以同時提交任務。

  • Caffeine中多個生產者業務執行緒,去進行快取寫入操作, 而只有單個的維護執行緒,基於W-TinyLRU進行訪問記錄的維護

所以 JCTools 提供的 Mpsc Queue 非常適合:

  • Netty 非同步任務場景,
  • Caffeine的寫入操作訪問記錄維護場景.

Mpsc Queue 有多種的實現類,例如 MpscArrayQueue、MpscUnboundedArrayQueue、MpscChunkedArrayQueue 等。

首先我們看下 MpscArrayQueue 的繼承關係

除了頂層 JDK 原生的 AbstractCollection、AbstractQueue介面之外,MpscArrayQueue 還繼承了很多類似於 MpscXxxPad 以及 MpscXxxField 的類。

高效能元件的Cache Line Padding快取行填充 技術

我們可以發現一個很有意思的規律,JCtool 每個有包含屬性的類後面都會被 MpscXxxPad 類隔開。

MpscXxxPad 到底起到什麼作用呢?

首先,回顧一下 Disruptor RingBuffer 的快取行填充

Disruptor RingBuffer 的快取行填充

Disruptor RingBuffer(環形緩衝區)定義了RingBufferFields類,裡面有indexMask和其他幾個變數存放RingBuffer的內部狀態資訊。

Disruptor利用了快取行填充,在 RingBufferFields裡面定義的變數的前後,分別定義了7個long型別的變數:

  • 前面7個來自繼承的 RingBufferPad 類

  • 後面7個直接定義在 RingBuffer 類

這14個變數無任何實際用途。我們既不讀他們,也不寫他們。幫助RingBufferFields 進行快取行填充Cache Line Padding

所以,一旦 RingBufferFields 被載入到CPU Cache後,只要被頻繁讀取訪問,就不會再被換出Cache。

這意味著,對於RingBufferFields 的讀取速度,會一直是CPU Cache的訪問速度,而非記憶體的訪問速度。

MPSC的RingBuffer 的快取行填充

我們自頂向下,將所有類的欄位合併在一起,看下 MpscArrayQueue 的整體結構。

除了頂層 JDK 原生的 AbstractCollection、AbstractQueue,MpscArrayQueue 還繼承了很多類似於 MpscXxxPad 以及 MpscXxxField 的類。

我們可以發現一個很有意思的規律:

每個有包含屬性的類後面都會被 MpscXxxPad 類隔開。

MpscXxxPad 到底起到什麼作用呢?

我們自頂向下,將所有類的欄位合併在一起,看下 MpscArrayQueue 的整體結構。

  • 填充類 BaseMpscLinkedArrayQueuePad1
abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue {
    long p01, p02, p03, p04, p05, p06, p07;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}
  • 包含屬性的類 BaseMpscLinkedArrayQueueProducerFields

    包含生產相關的屬性


// $gen:ordered-fields
abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedArrayQueuePad1<E> {
    private final static long P_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex");

    private volatile long producerIndex;

    // 獲取生產者索引
    // lv = lazy value
    @Override
    public final long lvProducerIndex() {
        return producerIndex;
    }

    //  so = set ordered / lazy set
    final void soProducerIndex(long newValue) {
        UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue);
    }

    final boolean casProducerIndex(long expect, long newValue) {
        return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
    }
}

  • 填充類 BaseMpscLinkedArrayQueuePad2

abstract class BaseMpscLinkedArrayQueuePad2<E> extends BaseMpscLinkedArrayQueueProducerFields<E> {
    long p01, p02, p03, p04, p05, p06, p07;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}
  • 包含屬性的類 BaseMpscLinkedArrayQueueConsumerFields

    包含消費者相關的屬性


// $gen:ordered-fields
abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E> {
    private final static long C_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueConsumerFields.class, "consumerIndex");

    private volatile long consumerIndex;
    protected long consumerMask;
    protected E[] consumerBuffer;

    // load volatile 消費者索引
    @Override
    public final long lvConsumerIndex() {
        return consumerIndex;
    }

    // load pain
    final long lpConsumerIndex() {
        return UNSAFE.getLong(this, C_INDEX_OFFSET);
    }

    //  store  ordered 消費者索引
    final void soConsumerIndex(long newValue) {
        UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
    }
}

可以看出,MpscXxxPad 類中使用了大量 long 型別的變數,其命名沒有什麼特殊的含義,只是起到填充的作用。

和Disruptor 的原始碼,Mpsc Queue 和 Disruptor 之所以填充這些無意義的變數,是為了解決偽共享(false sharing)問題。

什麼是偽共享呢?

請參考第 23章:100w級別qps日誌平臺 ,

那裡介紹的極致詳細,還有效能對比實操測試。已經是史詩級的詳細了,這裡不做介紹。

MpscGrowableArrayQueue 原始碼分析

MpscArrayQueue 基本用法與 ArrayBlockingQueue 都是類似的:入隊 offer()和出隊 poll()。

MpscArrayQueue 如何使用,示例程式碼如下:

package org.jctools.demo;

import org.jctools.queues.MpscArrayQueue;
import org.jctools.queues.MpscGrowableArrayQueue;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MpscArrayQueueTest {
    public static final MpscGrowableArrayQueue<String> MPSC_ARRAY_QUEUE = new MpscGrowableArrayQueue<>(4);

    public static final ExecutorService pool= Executors.newFixedThreadPool(4);
    public static void main(String[] args) {
        for (int i = 1; i <= 4; i++) {
            int index = i;
            pool.submit(() -> MPSC_ARRAY_QUEUE.offer("瘋狂創客圈 java 卷王 " + index) ); // 入隊操作
        }
        try {
            Thread.sleep(1000L);
            MPSC_ARRAY_QUEUE.add("瘋狂創客圈 java 卷王  5"); // 入隊操作,滿則跑出異常
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("佇列大小:" + MPSC_ARRAY_QUEUE.size() + ", 佇列容量:" + MPSC_ARRAY_QUEUE.capacity());
        System.out.println("出隊:" + MPSC_ARRAY_QUEUE.remove()); // 出隊操作,佇列為空則丟擲異常
        System.out.println("出隊:" + MPSC_ARRAY_QUEUE.poll()); // 出隊操作,佇列為空則返回 NULL
    }
}

程式輸出結果如下:

MpscUnboundedArrayQueue的巨集觀架構

Caffeine和Netty,所使用的並不是原始的MpscArrayQueue ,而是 其高效能的子類MpscGrowableArrayQueue,來看看其巨集觀架構。

巨集觀上,MpscGrowableArrayQueue 基於陣列+連結串列的複合結構。

基於陣列+連結串列的複合結構的優勢:

1 不會像連結串列那樣分配過多的Node,吞吐量比傳統的連結串列高。

2 擴容的時候,也不存在陣列複製,擴容的速度,也比傳統的陣列快

MpscUnboundedArrayQueue的重要屬性

MpscUnboundedArrayQueue基本的資料結構由「陣列+連結串列」組成,它有兩個指標:

  • producerBuffer 指向生產者生產的陣列

  • consumerBuffer,指向消費者消費的陣列

它還有兩個索引指標:

  • producerIndex 指向生產者生產的索引

  • consumerIndex,指向消費者消費的索引

這兩個索引會以2為步長不斷遞增。

另外對於生產者,它還有一個producerLimit指標,它代表生產者生產訊息的上限,

達到該producerLimit上限,Queue就要擴容了,

擴容的方式是建立一個長度一樣的新陣列,然後舊陣列的最後一個元素指向新陣列,形成單向連結串列。

這些屬性,為了高效能訪問,都進行了填充,分佈在不同的基類中

  • BaseMpscLinkedArrayQueueColdProducerFields 生產者欄位
abstract class BaseMpscLinkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueuePad3<E> {
    private final static long P_LIMIT_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueColdProducerFields.class, "producerLimit");

    private volatile long producerLimit; //它代表生產者生產訊息的上限
    protected long producerMask; // 計算生產者 陣列下標的掩碼
    protected E[] producerBuffer; // 計算生產者 資料的 陣列

.....
  • BaseMpscLinkedArrayQueueConsumerFields 消費者欄位
abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E> {
    private final static long C_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueConsumerFields.class, "consumerIndex");

    private volatile long consumerIndex; // 消費者索引
    protected long consumerMask;   // 計算消費 陣列下標的掩碼
    protected E[] consumerBuffer; // 計算消費 資料的 陣列
.....

看到 mask 變數,是用於計算陣列下標的掩碼。

相當於取模的操作,只是這裡使用位運算取模,所以,佇列中陣列的容量大小肯定是 2 的次冪。

因為 Mpsc 是多生產者單消費者佇列,所以 producerIndex、producerLimit 都是用 volatile 進行修飾的,其中一個生產者執行緒的修改需要對其他生產者執行緒可見。

還有一些其他的屬性

abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
        implements MessagePassingQueue<E>, QueueProgressIndicators {

    // 陣列被生產者填滿後,會填充一個JUMP,代表隊列擴容了,消費者遇到JUMP會消費下一個陣列。

    // No post padding here, subclasses must add
    private static final Object JUMP = new Object();

    // 消費者消費完一個完整的陣列後,會將最後一個元素設為BUFFER_CONSUMED。
    private static final Object BUFFER_CONSUMED = new Object();
    .....

MpscUnboundedArrayQueue建構函式,

需要給定一個chunkSize,指定塊大小,

    public static final MpscUnboundedArrayQueue<String> MPSC_ARRAY_QUEUE = new MpscUnboundedArrayQueue<>(4);

MpscQueue由一系列陣列構成,chunkSize就是陣列的大小,它必須是一個2的冪次方數。

public class MpscUnboundedArrayQueue<E> extends BaseMpscLinkedArrayQueue<E>
{
    long p0, p1, p2, p3, p4, p5, p6, p7;
    long p10, p11, p12, p13, p14, p15, p16, p17;

    public MpscUnboundedArrayQueue(int chunkSize)
    {
        super(chunkSize);
    }
....

父類的建構函式

    public BaseMpscLinkedArrayQueue(final int initialCapacity) {

        // initialCapacity必須大於等於2
        RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");

        // 容量確保是2的冪次方數,找到initialCapacity下一個2的冪次方數
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        // leave lower bit of mask clear
        // index以2為步長遞增,預留一個元素存JUMP,所以limit為:(capacity-1)*2
        long mask = (p2capacity - 1) << 1;
        // need extra element to point at next array
        // 需要一個額外元素來連結下一個陣列
        E[] buffer = allocateRefArray(p2capacity + 1);
        // 生產者和消費者Buffer指向同一個陣列
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        // 設定producerLimit = mask
        soProducerLimit(mask); // we know it's all empty to start with
    }

在父類的建構函式中,計算了mask,初始化了一個數組,

並將producerBuffer和consumerBuffer都指向了同一個陣列,然後根據mask設定producerLimit。

假設initialCapacity為4,陣列的長度就是5,

因為最後一個元素會用來存放擴容陣列的地址,形成連結串列。

每個陣列還會預留一個槽位存放JUMP元素,代表隊列擴容了,消費者遇到JUMP元素就會通過最後一個元素找到擴容後的陣列繼續消費,因此一個數組最多保留3個元素。

入隊 、擴容原始碼分析

圖解:入隊的核心流程

這個結構比較複雜,大家慢慢看吧

新佇列的屬性值

通過offer插入一個元素

第一次produceIndex到達 producerLimit 開始擴容

擴容之後,插入一個新的元素,之後的屬性值

第二次produceIndex到達 producerLimit

擴容之後,插入一個新的元素,之後的屬性值

再插入兩個元素

入隊 offer 的原始碼分析

offer(e)會將元素e新增到佇列中,即生產資料。

在MpscQueue中,執行緒通過CAS的方式以步長為2遞增producerIndex,CAS會保證只有一個執行緒操作成功,CAS成功就代表執行緒搶到了陣列中的槽位,它可以將元素e新增到陣列的指定槽位。

CAS失敗代表併發失敗了,會自旋重試。

如果producerIndex達到producerLimit,代表生產達到上限,佇列可能需要擴容了。

offerSlowPath()方法會判斷佇列是否需要擴容,如果需要擴容,也只會交給一個執行緒去擴容,這裡又是一個CAS操作,執行緒以1為步長遞增producerIndex,只有CAS成功的執行緒才會去執行擴容邏輯。

因此,在offer(e)的邏輯中,還會判斷producerIndex是否是奇數,如果為奇數就代表隊列正在擴容。

因為MpscQueue的擴容非常快速,它不需要遷移元素,只需要建立一個新陣列,再和舊陣列建立連線就可以了,

所以沒有必要讓其他執行緒掛起,執行緒發現佇列在擴容時,會進行自旋重試,直到擴容完成。

    /**
     * ,生產資料
     * 向佇列中新增一個元素e
     *
     * @param e not {@code null}, will throw NPE if it is
     * @return
     */
    @Override
    public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer; //生產者指向的陣列
        long pIndex; //生產索引

        while (true) {
            long producerLimit = lvProducerLimit(); // 獲取生產者索引最大限制


            pIndex = lvProducerIndex();        // 獲取生產者索引

            // 生產索引以2為步長遞增,
            // 第0位標識為resize,所以非擴容場景,不會是奇數,
            // 擴容的時候,會在offerSlowPath()中擴容執行緒會將其設為奇數
            // lower bit is indicative of resize, if we see it we spin until it's cleared
            if ((pIndex & 1) == 1) {
                // 奇數代表正在擴容,自旋,等待擴容完成
                continue;
            }
            // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
            // pIndex 是偶數, 實際的索引值 需要 除以2

            // mask/buffer may get changed by resizing -> only use for array access after successful CAS.

            mask = this.producerMask;
            buffer = this.producerBuffer;

            // a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)
            // assumption behind this optimization is that queue is almost always empty or near empty
            // 閾值 producerLimit 小於等於生產者指標位置 pIndex
            // 需要擴容,
            if (producerLimit <= pIndex) {
                // 通過offerSlowPath返回狀態值,來檢視怎麼來處理這個待新增的元素
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result) {
                    // producerLimit雖然達到了limit,
                    // 但是當前陣列已經被消費了部分資料,暫時不會擴容,會使用已被消費的槽位。
                    case CONTINUE_TO_P_INDEX_CAS:
                        break;
                    // 可能由於併發原因導致CAS失敗,那麼則再次重新嘗試新增元素
                    case RETRY:
                        continue;
                        // 佇列已滿,直接返回false操作
                    case QUEUE_FULL:
                        return false;
                    // 佇列需要擴容操作
                    case QUEUE_RESIZE:
                        // 對佇列進行直接擴容操作
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }
            // 閾值 producerLimit 大於 生產者指標位置 pIndex
            // 直接通過CAS操作對pIndex做加2處理
            if (casProducerIndex(pIndex, pIndex + 2)) {
                break;
            }
        }
        // INDEX visible before ELEMENT
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        // 將buffer陣列的指定位置替換為e,
        // 不是根據下標來設定的,是根據槽位的地址偏移量offset,UNSAFE操作。
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

offerSlowPath()會告訴執行緒佇列是滿了,還是需要擴容,還是需要自旋重試。

總之,這個一條慢路徑,所以叫做 slow path。

雖然producerIndex達到了producerLimit,但不代表隊列就非得擴容,

如果消費者已經消費了部分生產者指向的陣列元素,就意味這當前陣列還是有槽位可以繼續用的,暫時不用擴容。

  /**
     * @param mask
     * @param pIndex        生產者索引
     * @param producerLimit 生產者limit
     * @return
     */
    private int offerSlowPath(long mask, long pIndex, long producerLimit) {
        // 消費者索引
        final long cIndex = lvConsumerIndex();
        // 陣列緩衝的容量,(長度-1) * 2
        long bufferCapacity = getCurrentBufferCapacity(mask);
        // 消費索引  + 當前陣列的容量 > 生產索引,代表當前陣列已有部分元素被消費了,
        // 不會擴容,會使用已被消費的槽位。
        // cIndex + bufferCapacity =》 producerLimit
        if (cIndex + bufferCapacity > pIndex) {
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
                // CAS失敗,自旋重試
                // retry from top
                return RETRY;
            } else {
                // continue to pIndex CAS
                // 重試  CAS修改 生產索引
                return CONTINUE_TO_P_INDEX_CAS;
            }
        }
        // full and cannot grow
        // 根據生產者和消費者索引判斷Queue是否已滿,無界佇列永不會滿
        else if (availableInQueue(pIndex, cIndex) <= 0) {
            // offer should return false;
            return QUEUE_FULL;
        }
        // grab index for resize -> set lower bit
        // CAS的方式將producerIndex加1,奇數代表正在resize
        else if (casProducerIndex(pIndex, pIndex + 1)) {
            // trigger a resize
            return QUEUE_RESIZE;
        } else {
            // failed resize attempt, retry from top
            return RETRY;
        }
    }

如果需要擴容,執行緒會CAS操作將producerIndex改為奇數,讓其它執行緒能感知到佇列正在擴容,要生產資料的執行緒先自旋,等待擴容完成再繼續操作。

offer()過程中,通過CAS搶槽位,CAS失敗的執行緒自旋重試。

如果遇到佇列需要擴容,則將producerIndex設為奇數,其他執行緒自旋等待,一直等到擴容完成,擴容後再設為偶數,通知其它執行緒繼續生產。

入隊擴容原始碼分析

resize()是擴容的核心方法,

它首先會建立一個相同長度的新陣列,將producerBuffer指向新陣列,然後將元素e放到新陣列中,

舊元素的最後一個元素指向新陣列,形成連結串列。

還會將舊元素的槽位填充JUMP元素,代表隊列擴容了。

 // 擴容:
// 新建一個E[],將oldBuffer和newBuffer建立連線。
    //將producerBuffer指向新陣列,然後將元素e放到新陣列中,
// 舊元素的最後一個元素指向新陣列,形成連結串列。
// 還會將舊元素的槽位填充JUMP元素,代表隊列擴容了。
    private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
        assert (e != null && s == null) || (e == null || s != null);

        // 下一個Buffer的長度,MpscQueue會構建一個相同長度的Buffer
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try {
            // 建立一個新的E[]

            newBuffer = allocateRefArray(newBufferLength);
        } catch (OutOfMemoryError oom) {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }

        // 生產者Buffer指向新的E[]
        producerBuffer = newBuffer;
        // 計算新的Mask,Buffer長度不變的情況下,Mask也不變
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;

        // 根據該偏移量設定oldBuffer的JUMP元素,會遞增然後重置,不斷迴圈
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        // Mask不變的情況下,oldBuffer的JUMP對應的位置,就是newBuffer中要消費的位置.
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

        // 元素e放到新陣列中
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
        // 舊陣列和新陣列建立連線,舊陣列的最後一個元素儲存新陣列的地址。
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

        // ASSERT code
        final long cIndex = lvConsumerIndex();
        final long availableInQueue = availableInQueue(pIndex, cIndex);
        RangeUtil.checkPositive(availableInQueue, "availableInQueue");

        // Invalidate racing CASs
        // We never set the limit beyond the bounds of a buffer
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

        // make resize visible to the other producers
        soProducerIndex(pIndex + 2);

        // INDEX visible before ELEMENT, consistent with consumer expectation

        // make resize visible to consumer
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }

出隊 poll的核心流程

圖解:poll的核心流程

這個結構比較複雜,大家慢慢看吧

queue.poll() 消費後會將槽位設null

一個Buffer消費完畢,消費遇到JUMP節點

一個Buffer消費完畢,消費遇到JUMP節點,


通過nextbuffer找下一個陣列,消費相同索引位的元素

整個陣列消費完了,將最後一位設為BUFFER CONSUMED,從連結串列中剔除。

出隊 poll的原始碼分析

poll() 方法核心思路是獲取消費者索引 consumerIndex,然後根據 consumerIndex 計算得出陣列對應的偏移量,然後將陣列對應位置的元素取出並返回,最後將 consumerIndex 移動到環形陣列下一個位置。

如果元素為null,並不代表隊列是空的,還要比較consumerIndex和producerIndex,如果兩者索引不同,那麼producerIndex肯定是大於consumerIndex的,說明生產者已經在生產了,移動了producerIndex,只是還沒來得及將元素填充到陣列而已。

因為生產者是先CAS遞增producerIndex,再將元素填充到陣列的,兩步之間存在一個非常短的時間差,如果消費者恰好在這個時間差內去消費資料,那麼就自旋等待一下,等待生產者填充元素到陣列。

如果元素為JUMP,說明佇列擴容了,消費者需要根據陣列的最後一個元素找到擴容後的新陣列,消費新陣列的元素。

   // poll()沒有做併發控制,MpscQueue是多生產單消費者的Queue,只有一個消費者,這個也是netty 換成 mspcqueue的主要原因
    @Override
    public E poll() {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;

        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null) {
            if (index != lvProducerIndex()) {
                // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
                // check the producer index. If the queue is indeed not empty we spin until element is
                // visible.

                /*
			    offer()時生產者先CAS改producerIndex,再設定元素。	中間會有一個時間差,此時會自旋,等待元素設定完成。
			 */
                do {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            } else {

                //元素已經消費完
                return null;
            }
        }

        // 佇列擴容了
        if (e == JUMP) {
            //獲取下一個陣列
            final E[] nextBuffer = nextBuffer(buffer, mask);
            //從下一個陣列中消費
            return newBufferPoll(nextBuffer, index);
        }

        // 取出元素後,將原來的槽位設為null
        soRefElement(buffer, offset, null); // release element null
        // 遞增consumerIndex
        soConsumerIndex(index + 2); // release cIndex
        return (E) e;
    }

如果佇列擴容了,nextBuffer()會找到擴容後的新陣列,同時它還會將舊陣列的最後一個元素設為BUFFER_CONSUMED,代表當前陣列已經被消費完了,也就從連結串列中剔除了。

    @SuppressWarnings("unchecked")
    private E[] nextBuffer(final E[] buffer, final long mask) {

           /*
		     通過當前陣列的最後一個元素,獲取下一個待消費的陣列,
		     將當前陣列最後一個元素設為BUFFER_CONSUMED,代表當前陣列已經消費完畢。
		 */
        final long offset = nextArrayOffset(mask);
        final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
        consumerBuffer = nextBuffer;
        consumerMask = (length(nextBuffer) - 2) << 1;
        soRefElement(buffer, offset, BUFFER_CONSUMED);
        return nextBuffer;
    }

佇列中的有序寫入、有序讀取

有序寫入陣列元素

offer寫入的時候,jctool根據 pIndex 進行位運算計算得到陣列對應的下標,然後通過 soRefElement方法將資料寫入到陣列中,原始碼如下所示。

      }
        // INDEX visible before ELEMENT
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        // 將buffer陣列的指定位置替換為e,
        // 不是根據下標來設定的,是根據槽位的地址偏移量offset,UNSAFE操作。
        soRefElement(buffer, offset, e); // An ordered store of an element to a given offset
        return true;

   /**
     * An ordered store of an element to a given offset
     *
     * @param buffer this.buffer
     * @param offset computed via {@link UnsafeRefArrayAccess#calcCircularRefElementOffset}
     * @param e      an orderly kitty
     */
    public static <E> void soRefElement(E[] buffer, long offset, E e)
    {
        UNSAFE.putOrderedObject(buffer, offset, e);
    }

putOrderedObject() 和 putObject() 都可以用於更新物件的值,但是 putOrderedObject() 並不會立刻將資料更新到記憶體中,同時,也不會去保障不同CPU核心的 Cache Line 資料強一致。

這部分原理比較複雜,具體請參見 《Java高併發核心程式設計卷2》。

putOrderedObject() 使用的是 LazySet 延遲更新機制,所以效能方面 putOrderedObject() 要比 putObject() 高很多。

Java 中有四種類型的記憶體屏障,分別為 LoadLoad、StoreStore、LoadStore 和 StoreLoad。

putOrderedObject() 使用了 StoreStore Barrier,對於 Store1,StoreStore,Store2 這樣的操作序列,在 Store2 進行寫入之前,會保證 Store1 的寫操作對其他處理器可見。

有序寫入的優勢,就是效能高,寫操作結果有納秒級的延遲。

但是,有序寫入是有代價的,不會立刻被其他執行緒以及自身執行緒可見。

因為在 Mpsc Queue 的使用場景中,多個生產者只負責寫入資料,並沒有寫入之後立刻讀取的需求,所以使用 LazySet 機制是沒有問題的,只要 StoreStore Barrier 保證多執行緒寫入的順序即可。

雖然 有序寫入陣列元素,但是,對生產者的pIndex 的寫入,用的是cas 寫入, 這個是有強一致性的。 通過StoreLoad Barrier保證有序性和可見性。

因為生產者有多個執行緒,所以 MpscArrayQueue 採用了 UNSAFE.getLongVolatile() 方法保證獲取pIndex 索引,保證其可見性、準確性。

getLongVolatile() 使用了 StoreLoad Barrier,對於 Store1,StoreLoad,Load2 的操作序列,在 Load2 以及後續的讀取操作之前,都會保證 Store1 的寫入操作對其他處理器可見。

StoreLoad 是四種記憶體屏障開銷最大的,

JCTool中的寬鬆讀寫命名規約

排他讀寫:這裡把對volatile 變數的可見寫入和可見讀取,叫做排他讀寫,需要保證 強一致,效能低

寬鬆讀寫:這裡把對volatile 變數的有序寫入和有序讀取,叫做寬鬆讀寫,不需要保證 強一致,但是納秒級操作

JCTool中的寬鬆讀寫命名規約 ,如下:

具體請參考 第25章視訊 《第25章講義:徹底穿透“快取之王”Caffeine底層原理、架構、原始碼 》

MpscArrayQueue 的優點總結:

MpscQueue由一系列陣列構成,陣列的最後一個元素指向下一個陣列,形成單向連結串列。

MpscQueue全程無鎖化,非阻塞,相較於JDK提供的同步阻塞佇列,效能有很好的提升,這也是Netty後來的版本將任務佇列替換為JCtools的重要原因。

在生產過程中,它用到了大量的CAS操作,對於需要做併發控制的地方,確保只有一個執行緒會執行成功,其他CAS失敗的執行緒會自旋重試,全程都是無鎖非阻塞的。

不管是擴容,還是等待元素被填充到陣列,這些過程都是會極快完成的,因此短暫的自旋會比執行緒掛起再喚醒效率更高。陣列擴容後會在原槽位填充JUMP元素,消費者遇到該元素就知道要尋找新陣列繼續消費了。

  • 通過cacheline padding 解決核心屬性的偽共享問題。
  • 陣列的容量設定為 2 的次冪,可以通過位運算快速定位到陣列對應下標。
  • 入隊操作通過CAS無鎖程式設計實現,並且通過鏈式陣列結構,和陣列節點的動態增加,解決擴容時的元素複製的問題,完成陣列的快速擴容,減少CAS空自旋。
  • 在消費者poll過程中,因為只有一個消費者執行緒,所以整個 poll() 的過程沒有 CAS 操作。
  • 通過有序的寫入元素,去掉volatile 的StoreLoad 屏障,實現奈米級別的寫入。 當然, 讀取的時候,如果間隔太短,需要進行短時間的自旋。

這個是非常高效能的。 這也是Netty、Caffeine 使用mpsc 佇列的原因

參考文獻

  1. 瘋狂創客圈 JAVA 高併發 總目錄

    ThreadLocal(史上最全)

  2. 3000頁《尼恩 Java 面試寶典 》的 35個面試專題

  3. 價值10W的架構師知識圖譜

4、尼恩 架構師哲學

5、尼恩 3高架構知識宇宙

https://blog.csdn.net/bz120413/article/details/122107790

https://blog.csdn.net/Javaesandyou/article/details/123918852

https://blog.csdn.net/Javaesandyou/article/details/123918852

https://blog.csdn.net/FreeeLinux/article/details/54897192

https://blog.csdn.net/weixin_41605937/article/details/121972371

推薦閱讀: