1. 程式人生 > >Disruptor3.0的實現細節

Disruptor3.0的實現細節

port 暴露 lean start size 代碼段 處理 set nano

本文旨在介紹Disruptor3.0的實現細節,首先從整體上描述了Disruptor3.0的核心類圖,Disruptor3.0 DSL(領域專用語言)的實現類圖,並以Disruptor官方列舉的幾大特性作為行文思路,看看Disruptor3.0是如何實現這些特性的:內存預加載、消除‘偽共享’、序號柵欄和序號配合使用來消除鎖和CAS、批處理效應的具體實現等。

核心類圖

技術分享

  • RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;
  • Sequencer——序號管理器,負責消費者/生產者各自序號、序號柵欄的管理和協調;
  • Sequence——序號,聲明一個序號,用於跟蹤ringbuffer中任務的變化和消費者的消費情況;
  • SequenceBarrier——序號柵欄,管理和協調生產者的遊標序號和各個消費者的序號,確保生產者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理;
  • EventProcessor——事件處理器,監聽RingBuffer的事件,並消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經準備好。
  • EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;代表著消費者。
  • Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。

DSL類圖

以下是Disruptor3.0 DSL(domain specific language 特定領域語言)的類圖,可以大致知道第三方如何繼承Disruptor3.0實現具體業務邏輯。

技術分享

  • Disruptor——對外暴露的門面類,提供start(),stop(),消費者事件註冊,生產者事件發布等api;
  • RingBuffer——對生產者提供下一序號獲取、entry元素獲取、entry數據更改等api;
  • EventHandler——消費者的接口定義,提供onEvent()方法,負責具體業務邏輯實現;
  • EventHandlerGroup——業務處理器分組,管理多個業務處理器的依賴關系,提供then()、before()、after()等api。

  以下給出代碼demo闡述第三方如何簡單繼承Disruptor3.0:

    public static void main(String[] args) throws Exception
    {
        // The ThreadFactory for create producer thread.
        ThreadFactory producerFactory = new ProducerFactory();

        // The factory for the event
        LongEventFactory eventFactory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 8;

        // Construct the Disruptor,創建Disruptor組件
        Disruptor<LongEvent> disruptor = new Disruptor<>(
                eventFactory, 
                bufferSize, 
                producerFactory, 
                ProducerType.SINGLE, 
                new BlockingWaitStrategy()
            );

        // Connect the handler,綁定消費者事件,可以是多個
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.handleEventsWith(new LogEventHandler());

        // Start the Disruptor, starts all threads running,啟動Disruptor,啟動所有線程,主要是消費者對應的EventProcessor偵聽線程,消費者事件處理器開始偵聽RingBuffer中的消息
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
       //生產者向RingBuffer中寫入消息 producer.onData(bb); Thread.sleep(
10); } }

關鍵時序圖

下圖展示了Disruptor3.0整個運行過程的時序圖,包括:始化、啟動、處理過程。

技術分享

內存預分配

RingBuffer使用數組Object[] entries作為存儲元素,如下圖所示,初始化RingBuffer時,會將所有的entries的每個元素指定為特定的Event,這時候event中的detail屬性是null;後面生產者向RingBuffer中寫入消息時,RingBuffer不是直接將enties[7]指向其他的event對象,而是先獲取event對象,然後更改event對象的detail屬性;消費者在消費時,也是從RingBuffer中讀取出event,然後取出其detail屬性。可以看出,生產/消費過程中,RingBuffer的entities[7]元素並未發生任何變化,未產生臨時對象,entities及其元素對象一直存活,知道RingBuffer消亡。故而可以最小化GC的頻率,提升性能。

技術分享註:圖中對象Entry寫錯,應當為Event。

以下是RingBuffer.java類中初始化enties數組的源碼:

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance(); //使用工廠方法初始化enties元素
        }
    }

消費者寫入數據到entry中:

//消費者實現EventHandler接口
public
class LongEventHandler implements EventHandler<LongEvent> {
  //event為從RingBuffer entry中讀取的事件內容,消費者從event中讀取數據,並完成業務邏輯處理
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(Thread.currentThread().getName() + " say : process LONG Event: " + event); } }

生產者從entry中讀取數據:

public class LongEventProducer
{
  //生產者持有RingBuffer實例,可以直接向RingBuffer實例中的entry寫入數據
private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try {
       //從ringBuffer實例中獲取entry LongEvent event
= ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence
       //生產者將數據寫入entry
event.set(bb.getLong(0)); // Fill with data } finally {
       //生產者向ringBuffer提交數據變更 ringBuffer.publish(sequence); } } }

可以看出:生產者未更改ringBuffer實例中entry對象,只是更改了entry中的數據,避免了過多創建臨時entry對象帶來的GC,進而降低了性能損耗。

消除‘偽共享’

如果兩個不同的並發變量位於同一個緩存行,則在並發情況下,會互相影響到彼此的緩存有效性,進而影響到性能,這叫著‘偽共享’。為了避開‘偽共享’,Disruptor3.0在Sequence.java中使用多個long變量填充,從而確保一個序號獨占一個緩存行。關於緩存行和‘偽共享’請參考:偽共享(False Sharing)。

具體實現代碼如下:

//在序號實際value變量(long型)左邊填充7個long變量
class
LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; }
//在序號實際value變量(long型)右邊填充7個long變量
class RhsPadding extends Value {
  
protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
  static final long INITIAL_VALUE = -1L;
  public Sequence() {
    
this(INITIAL_VALUE);
  }
  ......
}
Sequence實際value變量的左右均被填充了7個long型變量,其自身也是long型變量,一個long型變量占據8個字節,所以序號與他上一個/下一個序號之間的最小內存分布距離為:7*8=56byte,加上自身的8個byte,可以確保序號變量獨占長度為64byte(通常的一個緩存行長度)緩存行。

序號柵欄和序號配合使用來消除鎖和CAS

Disruptor3.0中,序號柵欄(SequenceBarrier)和序號(Sequence)搭配使用,協調和管理消費者與生產者的工作節奏,避免了鎖和CAS的使用。在Disruptor3.0中,各個消費者和生產者持有自己的序號,這些序號的變化必須滿足如下基本條件:

  • 消費者序號數值必須小於生產者序號數值;
  • 消費者序號數值必須小於其前置(依賴關系)消費者的序號數值;
  • 生產者序號數值不能大於消費者中最小的序號數值,以避免生產者速度過快,將還未來得及消費的消息覆蓋。

上述前兩點是在SequenceBarrier的waitFor()方法中完成的,源碼如下:

   public long waitFor(final long sequence) //sequence參數是該消費者期望獲取的下一個序號值
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
     //根據配置的waitStrategy策略,等待期望的下一序號值變得可用
    
//這裏並不保證返回值availableSequence一定等於 given sequence,他們的大小關系取決於采用的WaitStrategy。
     //eg. 1、YieldingWaitStrategy在自旋100次嘗試後,會直接返回dependentSequence的最小seq,這時並不保證返回值>=given sequence
     // 2、BlockingWaitStrategy則會阻塞等待given sequence可用為止,可用並不是說availableSequence == given sequence,而應當是指 >=
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
//如果當前可用的序號小於期望獲取的下一個序號,則返回availableSequence,這將導致調用者EventProcessor繼續wait
if (availableSequence < sequence) { return availableSequence; }      //這一句是‘批處理’的精妙所在,放在後面講 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }

上面第三點是針對生產者建立的Barrier,邏輯判定發生在生產者從ringBuffer獲取下一個可用的entry時,RingBuffer會將獲取下一個可用的entry委托給Sequencer。我們以最簡單的單生產者SingleProducerSequencer的next()實現來說明。SingleProducerSequencer.next()的源碼如下:

    public long next(int n) 
    {
        if (n < 1) //n表示此次生產者期望獲取多少個序號,通常是1
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;

        long nextSequence = nextValue + n;  //生產者當前序號值+期望獲取的序號數量後達到的序號值
        long wrapPoint = nextSequence - bufferSize; //減掉RingBuffer的總的buffer值,用於判斷是否出現‘覆蓋’
        long cachedGatingSequence = this.cachedValue;  //從後面代碼分析可得:cachedValue就是緩存的消費者中最小序號值,他不是當前最新的‘消費者中最小序號值’,而是上次程序進入到下面的if判定代碼段是,被賦值的當時的‘消費者中最小序號值’
                //這樣做的好處在於:在判定是否出現覆蓋的時候,不用每次都調用getMininumSequence計算‘消費者中的最小序號值’,從而節約開銷。只要確保當生產者的節奏大於了緩存的cachedGateingSequence一個bufferSize時,從新獲取一下 getMinimumSequence()即可。
//(wrapPoint > cachedGatingSequence) : 當生產者已經超過上一次緩存的‘消費者中最小序號值’(cachedGatingSequence)一個‘Ring’大小(bufferSize),需要重新獲取cachedGatingSequence,避免當生產者一直在生產,但是消費者不再消費的情況下,出現‘覆蓋’
        //(cachedGatingSequence > nextValue) : 生產者和消費者均為順序遞增的,且生產者的seq“先於”消費者的seq,註意是‘先於’而不是‘大於’。當nextValue>Long.MAXVALUE時,nextValue+1就會變成負數,wrapPoint也會變成負數,這時候必然會是:cachedGatingSequence > nextValue
        //                                     這個變化的過程會持續bufferSize個序號,這個區間,由於getMinimumSequence()得到的雖然是名義上的‘消費者中最小序號值’,但是不代表是走在‘最後面’的消費者
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) 
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            //生產者停下來,等待消費者消費,知道‘覆蓋’現象清除。
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

批處理效應

當生產者節奏快於消費者,消費者可以通過‘批處理效應’快速追趕,即:消費者可以一次性從RingBuffer中獲取多個已經準備好的enties,從而提高效率。代碼實現如下:

SequenceBarrier的waitFor()方法:

    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        //獲取消費者可以消費的最大的可用序號,支持批處理效應,提升處理效率。
        //當availableSequence > sequence時,需要遍歷 sequence --> availableSequence,找到最前一個準備就緒,可以被消費的event對應的seq。
        //最小值為:sequence-1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

源代碼

LMAX-Exchange源碼github地址:https://github.com/LMAX-Exchange/disruptor

帶中文註釋的源碼github地址:https://github.com/daoqidelv/disruptor


Disruptor3.0的實現細節