Disruptor3.0的實現細節
本文旨在介紹Disruptor3.0的實現細節,首先從整體上描述了Disruptor3.0的核心類圖,Disruptor3.0 DSL(領域專用語言)的實現類圖,並以Disruptor官方列舉的幾大特性作為行文思路,看看Disruptor3.0是如何實現這些特性的:內存預加載、消除‘偽共享’、序號柵欄和序號配合使用來消除鎖和CAS、批處理效應的具體實現等。
核心類圖
- RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;
- Sequencer——序號管理器,負責消費者/生產者各自序號、序號柵欄的管理和協調;
- Sequence——序號,聲明一個序號,用於跟蹤ringbuffer中任務的變化和消費者的消費情況;
- SequenceBarrier——序號柵欄,管理和協調生產者的遊標序號和各個消費者的序號,確保生產者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理;
- EventProcessor——事件處理器,監聽RingBuffer的事件,並消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經準備好。
- EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;代表著消費者。
- Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。
DSL類圖
以下是Disruptor3.0 DSL(domain specific language 特定領域語言)的類圖,可以大致知道第三方如何繼承Disruptor3.0實現具體業務邏輯。
- Disruptor——對外暴露的門面類,提供start(),stop(),消費者事件註冊,生產者事件發布等api;
- RingBuffer——對生產者提供下一序號獲取、entry元素獲取、entry數據更改等api;
- EventHandler——消費者的接口定義,提供onEvent()方法,負責具體業務邏輯實現;
- EventHandlerGroup——業務處理器分組,管理多個業務處理器的依賴關系,提供then()、before()、after()等api。
以下給出代碼demo闡述第三方如何簡單繼承Disruptor3.0:
public static void main(String[] args) throws Exception { // The ThreadFactory for create producer thread. ThreadFactory producerFactory = new ProducerFactory(); // The factory for the event LongEventFactory eventFactory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 8; // Construct the Disruptor,創建Disruptor組件 Disruptor<LongEvent> disruptor = new Disruptor<>( eventFactory, bufferSize, producerFactory, ProducerType.SINGLE, new BlockingWaitStrategy() ); // Connect the handler,綁定消費者事件,可以是多個 disruptor.handleEventsWith(new LongEventHandler()); disruptor.handleEventsWith(new LogEventHandler()); // Start the Disruptor, starts all threads running,啟動Disruptor,啟動所有線程,主要是消費者對應的EventProcessor偵聽線程,消費者事件處理器開始偵聽RingBuffer中的消息 disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l);
//生產者向RingBuffer中寫入消息 producer.onData(bb); Thread.sleep(10); } }
關鍵時序圖
下圖展示了Disruptor3.0整個運行過程的時序圖,包括:始化、啟動、處理過程。
內存預分配
RingBuffer使用數組Object[] entries作為存儲元素,如下圖所示,初始化RingBuffer時,會將所有的entries的每個元素指定為特定的Event,這時候event中的detail屬性是null;後面生產者向RingBuffer中寫入消息時,RingBuffer不是直接將enties[7]指向其他的event對象,而是先獲取event對象,然後更改event對象的detail屬性;消費者在消費時,也是從RingBuffer中讀取出event,然後取出其detail屬性。可以看出,生產/消費過程中,RingBuffer的entities[7]元素並未發生任何變化,未產生臨時對象,entities及其元素對象一直存活,知道RingBuffer消亡。故而可以最小化GC的頻率,提升性能。
註:圖中對象Entry寫錯,應當為Event。
以下是RingBuffer.java類中初始化enties數組的源碼:
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); //使用工廠方法初始化enties元素 } }
消費者寫入數據到entry中:
//消費者實現EventHandler接口
public class LongEventHandler implements EventHandler<LongEvent> {
//event為從RingBuffer entry中讀取的事件內容,消費者從event中讀取數據,並完成業務邏輯處理 public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(Thread.currentThread().getName() + " say : process LONG Event: " + event); } }
生產者從entry中讀取數據:
public class LongEventProducer {
//生產者持有RingBuffer實例,可以直接向RingBuffer實例中的entry寫入數據 private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try {
//從ringBuffer實例中獲取entry LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence
//生產者將數據寫入entry event.set(bb.getLong(0)); // Fill with data } finally {
//生產者向ringBuffer提交數據變更 ringBuffer.publish(sequence); } } }
可以看出:生產者未更改ringBuffer實例中entry對象,只是更改了entry中的數據,避免了過多創建臨時entry對象帶來的GC,進而降低了性能損耗。
消除‘偽共享’
如果兩個不同的並發變量位於同一個緩存行,則在並發情況下,會互相影響到彼此的緩存有效性,進而影響到性能,這叫著‘偽共享’。為了避開‘偽共享’,Disruptor3.0在Sequence.java中使用多個long變量填充,從而確保一個序號獨占一個緩存行。關於緩存行和‘偽共享’請參考:偽共享(False Sharing)。
具體實現代碼如下:
//在序號實際value變量(long型)左邊填充7個long變量
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; }
//在序號實際value變量(long型)右邊填充7個long變量
class RhsPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = -1L;
public Sequence() {
this(INITIAL_VALUE);
}
......
}
Sequence實際value變量的左右均被填充了7個long型變量,其自身也是long型變量,一個long型變量占據8個字節,所以序號與他上一個/下一個序號之間的最小內存分布距離為:7*8=56byte,加上自身的8個byte,可以確保序號變量獨占長度為64byte(通常的一個緩存行長度)緩存行。
序號柵欄和序號配合使用來消除鎖和CAS
Disruptor3.0中,序號柵欄(SequenceBarrier)和序號(Sequence)搭配使用,協調和管理消費者與生產者的工作節奏,避免了鎖和CAS的使用。在Disruptor3.0中,各個消費者和生產者持有自己的序號,這些序號的變化必須滿足如下基本條件:
- 消費者序號數值必須小於生產者序號數值;
- 消費者序號數值必須小於其前置(依賴關系)消費者的序號數值;
- 生產者序號數值不能大於消費者中最小的序號數值,以避免生產者速度過快,將還未來得及消費的消息覆蓋。
上述前兩點是在SequenceBarrier的waitFor()方法中完成的,源碼如下:
public long waitFor(final long sequence) //sequence參數是該消費者期望獲取的下一個序號值 throws AlertException, InterruptedException, TimeoutException { checkAlert(); //根據配置的waitStrategy策略,等待期望的下一序號值變得可用
//這裏並不保證返回值availableSequence一定等於 given sequence,他們的大小關系取決於采用的WaitStrategy。
//eg. 1、YieldingWaitStrategy在自旋100次嘗試後,會直接返回dependentSequence的最小seq,這時並不保證返回值>=given sequence
// 2、BlockingWaitStrategy則會阻塞等待given sequence可用為止,可用並不是說availableSequence == given sequence,而應當是指 >=
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
//如果當前可用的序號小於期望獲取的下一個序號,則返回availableSequence,這將導致調用者EventProcessor繼續wait if (availableSequence < sequence) { return availableSequence; } //這一句是‘批處理’的精妙所在,放在後面講 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
上面第三點是針對生產者建立的Barrier,邏輯判定發生在生產者從ringBuffer獲取下一個可用的entry時,RingBuffer會將獲取下一個可用的entry委托給Sequencer。我們以最簡單的單生產者SingleProducerSequencer的next()實現來說明。SingleProducerSequencer.next()的源碼如下:
public long next(int n) { if (n < 1) //n表示此次生產者期望獲取多少個序號,通常是1 { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; long nextSequence = nextValue + n; //生產者當前序號值+期望獲取的序號數量後達到的序號值 long wrapPoint = nextSequence - bufferSize; //減掉RingBuffer的總的buffer值,用於判斷是否出現‘覆蓋’ long cachedGatingSequence = this.cachedValue; //從後面代碼分析可得:cachedValue就是緩存的消費者中最小序號值,他不是當前最新的‘消費者中最小序號值’,而是上次程序進入到下面的if判定代碼段是,被賦值的當時的‘消費者中最小序號值’ //這樣做的好處在於:在判定是否出現覆蓋的時候,不用每次都調用getMininumSequence計算‘消費者中的最小序號值’,從而節約開銷。只要確保當生產者的節奏大於了緩存的cachedGateingSequence一個bufferSize時,從新獲取一下 getMinimumSequence()即可。 //(wrapPoint > cachedGatingSequence) : 當生產者已經超過上一次緩存的‘消費者中最小序號值’(cachedGatingSequence)一個‘Ring’大小(bufferSize),需要重新獲取cachedGatingSequence,避免當生產者一直在生產,但是消費者不再消費的情況下,出現‘覆蓋’ //(cachedGatingSequence > nextValue) : 生產者和消費者均為順序遞增的,且生產者的seq“先於”消費者的seq,註意是‘先於’而不是‘大於’。當nextValue>Long.MAXVALUE時,nextValue+1就會變成負數,wrapPoint也會變成負數,這時候必然會是:cachedGatingSequence > nextValue // 這個變化的過程會持續bufferSize個序號,這個區間,由於getMinimumSequence()得到的雖然是名義上的‘消費者中最小序號值’,但是不代表是走在‘最後面’的消費者 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; //生產者停下來,等待消費者消費,知道‘覆蓋’現象清除。 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; }
批處理效應
當生產者節奏快於消費者,消費者可以通過‘批處理效應’快速追趕,即:消費者可以一次性從RingBuffer中獲取多個已經準備好的enties,從而提高效率。代碼實現如下:
SequenceBarrier的waitFor()方法:
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } //獲取消費者可以消費的最大的可用序號,支持批處理效應,提升處理效率。 //當availableSequence > sequence時,需要遍歷 sequence --> availableSequence,找到最前一個準備就緒,可以被消費的event對應的seq。 //最小值為:sequence-1 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
源代碼
LMAX-Exchange源碼github地址:https://github.com/LMAX-Exchange/disruptor
帶中文註釋的源碼github地址:https://github.com/daoqidelv/disruptor
Disruptor3.0的實現細節