1. 程式人生 > 實用技巧 >disruptor (史上最全)

disruptor (史上最全)


無鎖程式設計(Lock Free)框架 系列文章:

1 disruptor 是什麼?

Disruptor是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題(在效能測試中發現竟然與I/O操作處於同樣的數量級)。

基於Disruptor開發的系統單執行緒能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注。2011年,企業應用軟體專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。

目前,包括Apache Storm、Camel、Log4j 2在內的很多知名專案都應用了Disruptor以獲取高效能。

需要特別指出的是,這裡所說的佇列是系統內部的記憶體佇列,而不是Kafka這樣的分散式佇列。另外,本文所描述的Disruptor特性限於3.3.4。

2 Java內建佇列的問題

介紹Disruptor之前,我們先來看一看常用的執行緒安全的內建佇列有什麼問題。Java的內建佇列如下表所示。

佇列 有界性 資料結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

佇列的底層一般分成三種:陣列、連結串列和堆。其中,堆一般情況下是為了實現帶有優先順序特性的佇列,暫且不考慮。

從陣列和連結串列兩種資料結構來看,基於陣列執行緒安全的佇列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證執行緒安全;基於連結串列的執行緒安全佇列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實現執行緒安全,而後者以及上面表格中的LinkedTransferQueue都是通過原子變數compare and swap(以下簡稱“CAS”)這種不加鎖的方式來實現的。

但是對 volatile型別的變數進行 CAS 操作,存在偽共享問題,具體請參考專門的文章:

偽共享 (圖解)

Disruptor 使用了類似上面的方案,解決了偽共享問題。

3 Disruptor框架是如何解決偽共享問題的?

在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long型別資料value,無論是Disruptor中的基於陣列實現的緩衝區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩衝區中,Sequence標示著寫入進度,例如每次生產者要寫入資料進緩衝區時,都要呼叫RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示著它們的事件序號,來看看Sequence類的原始碼:

  class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	

​```
public Sequence() {
	this(INITIAL_VALUE);
}

public Sequence(final long initialValue) {
	UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
​```

}

從第1到11行可以看到,真正使用到的變數value,它的前後空間都由8個long型的變數填補了,對於一個大小為64位元組的快取行,它剛好被填補滿(一個long型變數value,8個位元組加上前/後個7long型變數填補,7*8=56,56+8=64位元組)。這樣做每次把變數value讀進快取記憶體中時,都能把快取行填充滿(對於大小為64個位元組的快取行來說,如果快取行大小大於64個位元組,那麼還是會出現偽共享問題),保證每次處理資料時都不會與其他變數發生衝突。

Disruptor 的使用場景

Disruptor的最常用的場景就是“生產者-消費者”場景,對場景的就是“一個生產者、多個消費者”的場景,並且要求順序處理。

當前業界開源元件使用Disruptor的包括Log4j2、Apache Storm等,它可以用來作為高效能的有界記憶體佇列,基於生產者消費者模式,實現一個/多個生產者對應多個消費者。它也可以認為是觀察者模式的一種實現,或者釋出訂閱模式。

舉個例子,我們從MySQL的BigLog檔案中順序讀取資料,然後寫入到ElasticSearch(搜尋引擎)中。在這種場景下,BigLog要求一個檔案一個生產者,那個是一個生產者。而寫入到ElasticSearch,則嚴格要求順序,否則會出現問題,所以通常意義上的多消費者執行緒無法解決該問題,如果通過加鎖,則效能大打折扣。

實戰:Disruptor 的 使用例項

我們從一個簡單的例子開始學習Disruptor:生產者傳遞一個long型別的值給消費者,而消費者消費這個資料的方式僅僅是把它打印出來。

定義一個Event

首先定義一個Event來包含需要傳遞的資料:

public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

由於需要讓Disruptor為我們建立事件,我們同時還聲明瞭一個EventFactory來例項化Event物件。

public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

定義事件處理器(disruptor會回撥此處理器的方法)

我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中儲存的資料列印到終端:

/** 
 */public class LongEventHandler implements EventHandler<LongEvent> { 
    @Override 
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { 
        System.out.println(longEvent.getValue()); 
    } 
} 

定義事件源: 事件釋出器 釋出事件

事件都會有一個生成事件的源,這個例子中假設事件是由於磁碟IO或者network讀取資料的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的資料,也就是說,事件源會在IO讀取到一部分資料的時候觸發事件(觸發事件不是自動的,程式設計師需要在讀取到資料的時候自己觸發事件併發布):

public class LongEventProducer { 
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    /** 
     * onData用來發布事件,每呼叫一次就釋出一次事件事件 
     * 它的引數會通過事件傳遞給消費者 
     * 
     * @param bb 
     */public void onData(ByteBuffer bb) { 
            //可以把ringBuffer看做一個事件佇列,那麼next就是得到下面一個事件槽
            long sequence = ringBuffer.next();
            
        try { 
            //用上面的索引取出一個空的事件用於填充 
            LongEvent event = ringBuffer.get(sequence);// for the sequence 
            event.setValue(bb.getLong(0)); 
        } finally { 
            //釋出事件 
            ringBuffer.publish(sequence); 
        } 
    } 
} 

很明顯的是:當用一個簡單佇列來發布事件的時候會牽涉更多的細節,這是因為事件物件還需要預先建立。

釋出事件最少需要兩步:

獲取下一個事件槽,釋出事件(釋出事件的時候要使用try/finnally保證事件一定會被髮布)。

如果我們使用RingBuffer.next()獲取一個事件槽,那麼一定要釋出對應的事件。如果不能釋出事件,那麼就會引起Disruptor狀態的混亂。尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。

Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator(事件轉換器)來發布事件。

Disruptor3.0以後的事件轉換器(填充事件的業務資料)

public class LongEventProducerWithTranslator { 
    //一個translator可以看做一個事件初始化器,publicEvent方法會呼叫它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = 
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() { 
                public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { 
                    event.setValue(bb.getLong(0)); 
                } 
            };
            
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    public void onData(ByteBuffer bb) { 
        ringBuffer.publishEvent(TRANSLATOR, bb); 
    } 
} 

上面寫法的另一個好處是,Translator可以分離出來並且更加容易單元測試。Disruptor提供了不同的介面(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去產生一個Translator物件。很明顯,Translator中方法的引數是通過RingBuffer來傳遞的。

組裝起來

最後一步就是把所有的程式碼組合起來完成一個完整的事件處理系統。Disruptor在這方面做了簡化,使用了DSL風格的程式碼(其實就是按照直觀的寫法,不太能算得上真正的DSL)。雖然DSL的寫法比較簡單,但是並沒有提供所有的選項。如果依靠DSL已經可以處理大部分情況了。

注意:這裡沒有使用時間轉換器,而是使用簡單的 事件釋出器。

public class LongEventMain { 
    public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // The factory for the event 
        LongEventFactory factory = new LongEventFactory();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
        
        // Connect the handler 
        disruptor.handleEventsWith(new LongEventHandler());
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            
            //釋出事件 
            producer.onData(bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

在Java 8使用Disruptor

Disruptor在自己的接口裡面添加了對於Java 8 Lambda的支援。大部分Disruptor中的介面都符合Functional Interface的要求(也就是在介面中僅僅有一個方法)。所以在Disruptor中,可以廣泛使用Lambda來代替自定義類。

public class LongEventMainJava8 { 
    /** 
     * 用lambda表示式來註冊EventHandler和EventProductor 
     * @param args 
     * @throws InterruptedException 
     */public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;// Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // 可以使用lambda來註冊一個EventHandler 
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

由於在Java 8中方法引用也是一個lambda,因此還可以把上面的程式碼改成下面的程式碼:

public class LongEventWithMethodRef { 
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) 
    { 
        System.out.println(event.getValue()); 
    } 
 
    public static void translate(LongEvent event, long sequence, ByteBuffer buffer) 
    { 
        event.setValue(buffer.getLong(0)); 
    } 
 
    public static void main(String[] args) throws Exception 
    { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // Connect the handler 
        disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent);
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) 
        { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

Disruptor如何實現高效能?

Disruptor實現高效能主要體現了去掉了鎖,採用CAS演算法,同時內部通過環形佇列實現有界佇列。

  • 環形資料結構
    為了避免垃圾回收,採用陣列而非連結串列。同時,陣列對處理器的快取機制更加友好。

  • 元素位置定位
    陣列長度2^n,通過位運算,加快定位的速度。下標採取遞增的形式。不用擔心index溢位的問題。index是long型別,即使100萬QPS的處理速度,也需要30萬年才能用完。

  • 無鎖設計
    每個生產者或者消費者執行緒,會先申請可以操作的元素在陣列中的位置,申請到之後,直接在該位置寫入或者讀取資料。整個過程通過原子變數CAS,保證操作的執行緒安全。

使用Disruptor,主要用於對效能要求高、延遲低的場景,它通過“榨乾”機器的效能來換取處理的高效能。如果你的專案有對效能要求高,對延遲要求低的需求,並且需要一個無鎖的有界佇列,來實現生產者/消費者模式,那麼Disruptor是你的不二選擇。

原理:Disruptor 的內部Ring Buffer環形佇列

RingBuffer是什麼

RingBuffer 是一個環(首尾相連的環),用做在不同上下文(執行緒)間傳遞資料的buffer。
RingBuffer 擁有一個序號,這個序號指向陣列中下一個可用元素。

Disruptor使用環形佇列的優勢:

Disruptor框架就是一個使用CAS操作的記憶體佇列,與普通的佇列不同,Disruptor框架使用的是一個基於陣列實現的環形佇列,無論是生產者向緩衝區裡提交任務,還是消費者從緩衝區裡獲取任務執行,都使用CAS操作。

使用環形佇列的優勢:

第一,簡化了多執行緒同步的複雜度。學資料結構的時候,實現佇列都要兩個指標head和tail來分別指向佇列的頭和尾,對於一般的佇列是這樣,想象下,如果有多個生產者同時往緩衝區佇列中提交任務,某一生產者提交新任務後,tail指標都要做修改的,那麼多個生產者提交任務,頭指標不會做修改,但會對tail指標產生衝突,例如某一生產者P1要做寫入操作,在獲得tail指標指向的物件值V後,執行compareAndSet()方法前,tail指標被另一生產者P2修改了,這時生產者P1執行compareAndSet()方法,發現tail指標指向的值V和期望值E不同,導致衝突。同樣,如果多個消費者不斷從緩衝區中獲取任務,不會修改尾指標,但會造成佇列頭指標head的衝突問題(因為佇列的FIFO特點,出列會從頭指標出開始)。

環形佇列的一個特點就是隻有一個指標,只通過一個指標來實現出列和入列操作。如果使用兩個指標head和tail來管理這個佇列,有可能會出現“偽共享”問題(偽共享問題在下面我會詳細說),因為建立佇列時,head和tail指標變數常常在同一個快取行中,多執行緒修改同一快取行中的變數就容易出現偽共享問題。

第二,由於使用的是環形佇列,那麼佇列建立時大小就被固定了,Disruptor框架中的環形佇列本來也就是基於陣列實現的,使用陣列的話,減少了系統對記憶體空間管理的壓力,因為它不像連結串列,Java會定期回收連結串列中一些不再引用的物件,而陣列不會出現空間的新分配和回收問題。

原理:Disruptor的等待策略

Disruptor預設的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變數來控制執行緒的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高效能。

SleepingWaitStrategy

和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是迴圈等待並且在迴圈中間呼叫LockSupport.parkNanos(1)來睡眠,(在Linux系統上面睡眠時間60µs).然而,它的優點在於生產執行緒只需要計數,而不執行任何指令。並且沒有條件變數的消耗。但是,事件物件從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件釋出對於生產者的影響比較小的情況下。比如非同步日誌功能。

YieldingWaitStrategy

YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會迴圈等待sequence增加到合適的值。迴圈中呼叫Thread.yield()允許其他準備好的執行緒執行。如果需要高效能而且事件消費者執行緒比邏輯核心少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超執行緒的時候。

BusySpinW4aitStrategy

BusySpinWaitStrategy是效能最高的等待策略,同時也是對部署環境要求最高的策略。這個效能最好用在事件處理執行緒比物理核心數目還要小的時候。例如:在禁用超執行緒技術的時候。

原理:並行模式

單一寫者模式

在併發系統中提高效能最好的方式之一就是單一寫者原則,對Disruptor也是適用的。如果在你的程式碼中僅僅有一個事件生產者,那麼可以設定為單一生產者模式來提高系統的效能。

public class singleProductorLongEventMain { 
    public static void main(String[] args) throws Exception { 
        //.....// Construct the Disruptor with a SingleProducerSequencer 
 
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                bufferSize, 
                ProducerType.SINGLE, // 單一寫者模式, 
                executor);//..... 
    } 
} 

一次生產,序列消費

比如:現在觸發一個註冊Event,需要有一個Handler來儲存資訊,一個Hanlder來發郵件等等。

/**
  * 序列依次執行
  * <br/>
  * p --> c11 --> c21
  * @param disruptor
  */
 public static void serial(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }

菱形方式執行

 public static void diamond(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }

鏈式平行計算

 public static void chain(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
     disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
     disruptor.start();
 }

相互隔離模式

 public static void parallelWithPool(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
     disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
     disruptor.start();
 }

航道模式

序列依次執行,同時C11,C21分別有2個例項

/**
  * 序列依次執行,同時C11,C21分別有2個例項
   * <br/>
   * p --> c11 --> c21
   * @param disruptor
   */
  public static void serialWithPool(Disruptor<LongEvent> disruptor){
      disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
      disruptor.start();
  }

回到◀瘋狂創客圈

瘋狂創客圈 - Java高併發研習社群,為大家開啟大廠之門