1. 程式人生 > >Disruptor 極速體驗

Disruptor 極速體驗

已經不記得最早接觸到 Disruptor 是什麼時候了,只記得發現它的時候它是以具有閃電般的速度被介紹的。於是在腦子裡, Disruptor 和“閃電”一詞關聯了起來,然而卻一直沒有時間去探究一下。

      最近正在進行一項對效能有很高要求的產品專案的研究,自然想起了閃電般的 Disruptor ,這必有它的用武之地,於是進行了一番探查,將成果和體會記錄在案。

一、什麼是 Disruptor 

從功能上來看,Disruptor 是實現了“佇列”的功能,而且是一個有界佇列。那麼它的應用場景自然就是“生產者-消費者”模型的應用場合了。

可以拿 JDK 的 BlockingQueue 做一個簡單對比,以便更好地認識 Disruptor 是什麼。

我們知道 BlockingQueue 是一個 FIFO 佇列,生產者(Producer)往佇列裡釋出(publish)一項事件(或稱之為“訊息”也可以)時,消費者(Consumer)能獲得通知;如果沒有事件時,消費者被堵塞,直到生產者釋出了新的事件。

這些都是 Disruptor 能做到的,與之不同的是,Disruptor 能做更多:

  • 同一個“事件”可以有多個消費者,消費者之間既可以並行處理,也可以相互依賴形成處理的先後次序(形成一個依賴圖);
  • 預分配用於儲存事件內容的記憶體空間;
  • 針對極高的效能目標而實現的極度優化和無鎖的設計;

以上的描述雖然簡單地指出了 Disruptor 是什麼,但對於它“能做什麼”還不是那麼直截了當。一般性地來說,當你需要在兩個獨立的處理過程(兩個執行緒)之間交換資料時,就可以使用 Disruptor 。當然使用佇列(如上面提到的 BlockingQueue)也可以,只不過 Disruptor 做得更好。

拿佇列來作比較的做法弱化了對 Disruptor 有多強大的認識,如果想要對此有更多的瞭解,可以仔細看看 Disruptor 在其東家 LMAX 交易平臺(也是實現者) 是如何作為核心架構來使用的,這方面就不做詳述了,問度娘或谷哥都能找到。

二、Disruptor 的核心概念

先從瞭解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領域物件,也是對映到程式碼實現上的核心物件。

  • Ring Buffer
    如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的物件,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的資料(事件)進行儲存和更新。在一些更高階的應用場景中,Ring Buffer 可以由使用者的自定義實現來完全替代。
  • Sequence  Disruptor
    通過順序遞增的序號來編號管理通過其進行交換的資料(事件),對資料(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用於標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU快取偽共享(Flase Sharing)問題。
    (注:這是 Disruptor 實現高效能的關鍵點之一,網上關於偽共享問題的介紹已經汗牛充棟,在此不再贅述)。
  • Sequencer 
    Sequencer 是 Disruptor 的真正核心。此介面有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞資料的併發演算法。
  • Sequence Barrier
    用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
  • Wait Strategy
    定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的效能表現)
  • Event
    在 Disruptor 的語義中,生產者和消費者之間進行交換的資料被稱為事件(Event)。它不是一個被 Disruptor 定義的特定型別,而是由 Disruptor 的使用者定義並指定。
  • EventProcessor
    EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於呼叫事件處理實現的事件迴圈(Event Loop)。
  • EventHandler
    Disruptor 定義的事件處理介面,由使用者實現,用於處理事件,是 Consumer 的真正實現。
  • Producer
    即生產者,只是泛指呼叫 Disruptor 釋出事件的使用者程式碼,Disruptor 沒有定義特定介面或型別。

三、如何使用 Disruptor 

Disruptor 的 API 十分簡單,主要有以下幾個步驟:

  1. 定義事件
    事件(Event)就是通過 Disruptor 進行交換的資料型別。
  2. public class LongEvent
    {
        private long value;
    
        public void set(long value)
        {
            this.value = value;
        }
    }
  3. 定義事件工廠
    事件工廠(Event Factory)定義瞭如何例項化前面第1步中定義的事件(Event),需要實現介面 com.lmax.disruptor.EventFactory<T>。
    Disruptor 通過 EventFactory 在 RingBuffer 中預建立 Event 的例項。
    一個 Event 例項實際上被用作一個“資料槽”,釋出者釋出前,先從 RingBuffer 獲得一個 Event 的例項,然後往 Event 例項中填充資料,之後再發布到 RingBuffer 中,之後由 Consumer 獲得該 Event 例項並從中讀取資料。
    import com.lmax.disruptor.EventFactory;
    
    public class LongEventFactory implements EventFactory<LongEvent>
    {
        public LongEvent newInstance()
        {
            return new LongEvent();
        }
    }
  4. 定義事件處理的具體實現
    通過實現介面 com.lmax.disruptor.EventHandler<T> 定義事件處理的具體實現。
    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler implements EventHandler<LongEvent>
    {
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
        {
            System.out.println("Event: " + event);
        }
    }
  5. 定義用於事件處理的執行緒池
    Disruptor 通過 java.util.concurrent.ExecutorService 提供的執行緒來觸發 Consumer 的事件處理。例如:
  6. ExecutorService executor = Executors.newCachedThreadPool();
    
  7. 指定等待策略
    Disruptor 定義了 com.lmax.disruptor.WaitStrategy 介面用於抽象 Consumer 如何等待新事件,這是策略模式的應用。
    Disruptor 提供了多個 WaitStrategy 的實現,每種策略都具有不同效能和優缺點,根據實際執行環境的 CPU 的硬體特點選擇恰當的策略,並配合特定的 JVM 的配置引數,能夠實現不同的效能提升。
    例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
    BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現;
    SleepingWaitStrategy 的效能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景;
    YieldingWaitStrategy 的效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性。
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
  8. 啟動 Disruptor
    EventFactory<LongEvent> eventFactory = new LongEventFactory();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方;
            
    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                    ringBufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
            
    EventHandler<LongEvent> eventHandler = new LongEventHandler();
    disruptor.handleEventsWith(eventHandler);
            
    disruptor.start();
  9. 釋出事件
    Disruptor 的事件釋出過程是一個兩階段提交的過程:
      第一步:先從 RingBuffer 獲取下一個可以寫入的事件的序號;
      第二步:獲取對應的事件物件,將資料寫入事件物件;
      第三部:將事件提交到 RingBuffer;
    事件只有在提交之後才會通知 EventProcessor 進行處理;
    // 釋出事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long sequence = ringBuffer.next();//請求下一個事件序號;
        
    try {
        LongEvent event = ringBuffer.get(sequence);//獲取該序號對應的事件物件;
        long data = getEventData();//獲取要通過事件傳遞的業務資料;
        event.set(data);
    } finally{
        ringBuffer.publish(sequence);//釋出事件;
    }

     注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。

    Disruptor 還提供另外一種形式的呼叫來簡化以上操作,並確保 publish 總是得到呼叫。
    static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
        @Override
        public void translateTo(LongEvent event, long sequence, Long data) {
            event.set(data);
        }    
    }
        
    public static Translator TRANSLATOR = new Translator();
        
    public static void publishEvent2(Disruptor<LongEvent> disruptor) {
        // 釋出事件;
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        long data = getEventData();//獲取要通過事件傳遞的業務資料;
        ringBuffer.publishEvent(TRANSLATOR, data);
    }

    此外,Disruptor 要求 RingBuffer.publish 必須得到呼叫的潛臺詞就是,如果發生異常也一樣要呼叫 publish ,那麼,很顯然這個時候需要呼叫者在事件處理的實現上來判斷事件攜帶的資料是否是正確的或者完整的,這是實現者應該要注意的事情。

  10. 關閉 Disruptor
    disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
    executor.shutdown();//關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;

四、效能對比測試

  為了直觀地感受 Disruptor 有多快,設計了一個性能對比測試:Producer 釋出 100 萬次事件,從釋出第一個事件開始計時,捕捉 Consumer 處理完所有事件的耗時。

  測試用例在 Producer 如何將事件通知到 Consumer 的實現方式上,設計了三種不同的實現:

  1. Producer 的事件釋出和 Consumer 的事件處理都在同一個執行緒,Producer 釋出事件後立即觸發 Consumer 的事件處理;
  2. Producer 的事件釋出和 Consumer 的事件處理在不同的執行緒,通過 ArrayBlockingQueue 傳遞給 Consumer 進行處理;
  3. Producer 的事件釋出和 Consumer 的事件處理在不同的執行緒,通過 Disruptor 傳遞給 Consumer 進行處理;

此次測試用例僅做了只有一個 Producer 和一個 Consumer 的情形,測試用例的程式碼如下:

CounterTracer tracer = tracerFactory.newInstance(DATA_COUNT);//計數跟蹤到達指定的數值;
TestHandler handler = new TestHandler(tracer);//Consumer 的事件處理;
        
EventPublisher publisher = publisherFactory.newInstance(new PublisherCreationArgs(DATA_COUNT, handler));//通過工廠物件建立不同的 Producer 的實現;
publisher.start();
tracer.start();
        
//釋出事件;
for (int i = 0; i < DATA_COUNT; i++) {
    publisher.publish(i);
}
        
//等待事件處理完成;
tracer.waitForReached();
        
publisher.stop();
        
//輸出結果;
printResult(tracer);

事件處理的實現只是呼叫一個計數器(CounterTracer)加1,該計數器跟蹤從開始到達到總的事件次數時所耗的時間。

public class TestHandler {
    
    private CounterTracer tracer;
    
    public TestHandler(CounterTracer tracer) {
        this.tracer = tracer;
    }
    
    /**
     * 如果返回 true,則表示處理已經全部完成,不再處理後續事件;
     * 
     * @param event
     * @return
     */
    public boolean process(TestEvent event){
        return tracer.count();
    }
}

針對單一Producer 和單一 Consumer 的測試場景,CounterTracer 的實現如下:

/**
 * 測試結果跟蹤器,計數器不是執行緒安全的,僅在單執行緒的 consumer 測試中使用;
 * 
 * @author haiq
 *
 */
public class SimpleTracer implements CounterTracer {

    private long startTicks;
    private long endTicks;
    private long count = 0;
    private boolean end = false;
    private final long expectedCount;
    private CountDownLatch latch = new CountDownLatch(1);

    public SimpleTracer(long expectedCount) {
        this.expectedCount = expectedCount;
    }

    @Override
    public void start() {
        startTicks = System.currentTimeMillis();
        end = false;
    }

    @Override
    public long getMilliTimeSpan() {
        return endTicks - startTicks;
    }

    @Override
    public boolean count() {
        if (end) {
            return end;
        }
        count++;
        end = count >= expectedCount;
        if (end) {
            endTicks = System.currentTimeMillis();
            latch.countDown();
        }
        return end;
    }

    @Override
    public void waitForReached() throws InterruptedException {
        latch.await();
    }
}

第一種 Producer 的實現:直接觸發事件處理;

public class DirectingPublisher implements EventPublisher {
    
    private TestHandler handler;    
    private TestEvent event = new TestEvent();
    
    public DirectingPublisher(TestHandler handler) {
        this.handler = handler;
    }

    @Override
    public void publish(int data) throws Exception {
        event.setValue(data);
        handler.process(event);
    }

    //省略其它程式碼;    
}

第二種 Producer 的實現:通過 ArrayBlockinigQueue 實現;

public class BlockingQueuePublisher implements EventPublisher {
    
    private ArrayBlockingQueue<TestEvent> queue ;    
    private TestHandler handler;    
    public BlockingQueuePublisher(int maxEventSize, TestHandler handler) {
        this.queue = new ArrayBlockingQueue<TestEvent>(maxEventSize);
        this.handler = handler;
    }

    public void start(){
        Thread thrd = new Thread(new Runnable() {
            @Override
            public void run() {
                handle();
            }
        });
        thrd.start();
    }
    
    private void handle(){
        try {
            TestEvent evt ;
            while (true) {
                evt = queue.take();
                if (evt != null && handler.process(evt)) {
                    //完成後自動結束處理執行緒;
                    break;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void publish(int data) throws Exception {
        TestEvent evt = new TestEvent();
        evt.setValue(data);
        queue.put(evt);
    }

    //省略其它程式碼;
}

第三種 Producer 的實現:通過 Disruptor 實現;

public class DisruptorPublisher implements EventPublisher {

    private class TestEventHandler implements EventHandler<TestEvent> {

        private TestHandler handler;

        public TestEventHandler(TestHandler handler) {
            this.handler = handler;
        }

        @Override
        public void onEvent(TestEvent event, long sequence, boolean endOfBatch)
                throws Exception {
            handler.process(event);
        }

    }
    
    private static final WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

    private Disruptor<TestEvent> disruptor;
    private TestEventHandler handler;
    private RingBuffer<TestEvent> ringbuffer;    
    private ExecutorService executor;

    public DisruptorPublisher(int bufferSize, TestHandler handler) {
        this.handler = new TestEventHandler(handler);
        executor = Executors.newSingleThreadExecutor();
        disruptor = new Disruptor<TestEvent>(EVENT_FACTORY, bufferSize,
                executor, ProducerType.SINGLE,
                YIELDING_WAIT);
    }

    @SuppressWarnings("unchecked")
    public void start() {
        disruptor.handleEventsWith(handler);
        disruptor.start();
        ringbuffer = disruptor.getRingBuffer();
    }

    @Override
    public void publish(int data) throws Exception {
        long seq = ringbuffer.next();
        try {
            TestEvent evt = ringbuffer.get(seq);
            evt.setValue(data);
        } finally {
            ringbuffer.publish(seq);
        }
    }

    //省略其它程式碼;
}

Producer 第一種實現並沒有執行緒間的交換,實際上就是直接呼叫計數器,因此以此種實現的測試結果作為基準,對比其它的兩種實現的測試結果。

在我的CPU CORE i5 / 4G 記憶體 / Win7 64 位的筆記本上,資料量(DATA_COUNT)取值為 1024 * 1024 時的測試結果如下:

【基準測試】
[1]--每秒吞吐量:--;(1048576/0ms)
[2]--每秒吞吐量:--;(1048576/0ms)
[3]--每秒吞吐量:--;(1048576/0ms)
[4]--每秒吞吐量:69905066;(1048576/15ms)
[5]--每秒吞吐量:--;(1048576/0ms)
【對比測試1: ArrayBlockingQueue 實現】
[1]--每秒吞吐量:4788018;(1048576/219ms)
[2]--每秒吞吐量:5165399;(1048576/203ms)
[3]--每秒吞吐量:4809981;(1048576/218ms)
[4]--每秒吞吐量:5165399;(1048576/203ms)
[5]--每秒吞吐量:5577531;(1048576/188ms)
【對比測試2: Disruptor實現】
[1]--每秒吞吐量:33825032;(1048576/31ms)
[2]--每秒吞吐量:65536000;(1048576/16ms)
[3]--每秒吞吐量:65536000;(1048576/16ms)
[4]--每秒吞吐量:69905066;(1048576/15ms)
[5]--每秒吞吐量:33825032;(1048576/31ms)

從測試結果看, Disruptor 的效能比 ArrayBlockingQueue 高出了幾乎一個數量級,操作耗時也只有平均20毫秒左右。

由於篇幅有限,關於 Disruptor 實現高效能的原理,留待以後再做探討。

六、參考資料

  1. Diruptor 頁面:https://github.com/LMAX-Exchange/disruptor