disruptor 高效能之道
disruptor是一個高效能的執行緒間非同步通訊的框架,即在同一個JVM程序中的多執行緒間訊息傳遞。應用disruptor知名專案有如下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有很多不少的應用,或者說有一些借鑑了它的設計機制。 下面就跟著筆者一起去領略下disruptor高效能之道吧~
disruptor是一款開源的高效能佇列框架,github地址為 https://github.com/LMAX-Exchange/disruptor。
分析disruptor,只要把event的生產和消費流程弄懂,基本上didsruptor的七寸就已經抓住了。話不多說,趕緊上車,筆者以下面程式碼為例講解disruptor:
public static void main(String[] args) { Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024, new PrefixThreadFactory("consumer-pool-", new AtomicInteger(0)), ProducerType.MULTI, new BlockingWaitStrategy()); // 註冊consumer並啟動 disruptor.handleEventsWith((EventHandler<StringEvent>) (event, sequence, endOfBatch) -> { System.out.println(Util.threadName()+ "onEvent " + event); }); disruptor.start(); // publisher邏輯 Executor executor = Executors.newFixedThreadPool(2, new PrefixThreadFactory("publisher-pool-", new AtomicInteger(0))); while (true) { for (int i = 0; i < 2; i++) { executor.execute(() -> { Util.sleep(1); disruptor.publishEvent((event, sequence, arg0) -> { event.setValue(arg0 + " " + sequence); }, "hello world"); }); } Util.sleep(1000); } }
event生產流程
event的生產是從 RingBuffer.publishEvent 開始的,event生產流程步驟如下:- 獲取待插入(到ringBuffer的)位置,相當於先佔個位
- 往該位置上設定event
- 設定sequence對應event的標誌,通知consumer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { // 獲取當前要設定的sequence序號,然後進行設定並通知消費者 final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } // 獲取下一個sequence,直到獲取到位置才返回 public long next(int n) { long current; long next; do { // 獲取當前ringBuffer的可寫入sequence current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { // 如果當前沒有空位置寫入,獲取多個consumer中消費進度最小的那個的消費進度 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { // 阻塞1ns,然後continue LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } // cas設定ringBuffer的sequence else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { // 設定event translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } public void publish(final long sequence) { // 1. 設定availableBuffer,表示對應的event是否設定完成,consumer執行緒中會用到 // - 注意,到這裡時,event已經設定完成,但是consumer還不知道該sequence對應的event是否設定完成, // - 所以需要設定availableBuffer中sequence對應event的sequence number // 2. 通知consumer setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); }
從translateAndPublish中看,如果使用者的設定event方法丟擲異常,這時event物件是不完整的,那麼publish到consumer端,consumer消費的不是完整的資料怎麼辦呢?在translateAndPublish中需不需要在異常情況下reset event物件呢?關於這個問題筆者之前是有疑問的,關於這個問題筆者提了一個issue,可點選 https://github.com/LMAX-Exchange/disruptor/issues/244 進行檢視。
筆者建議在consumer消費完event之後,進行reset event操作,這樣避免下次設定event異常consumer時取到不完整的資料,比如log4j2中的AsyncLogger中處理完log4jEvent之後就會呼叫clear方法進行重置event。
event消費流程
event消費流程入口是BatchEventProcessor.processEvents,event消費流程步驟:- 獲取當前consumer執行緒消費的offset,即nextSequence
- 從ringBuffer獲取可用的sequence,沒有新的event時,會根據consmer阻塞策略進行執行某些動作
- 獲取event,然後執行event回撥
- 設定當前consumer執行緒的消費進度
private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { // 獲取可用的sequence,預設直到有可用sequence時才返回 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } // 執行消費回撥動作,注意,這裡獲取到一個批次event,可能有多個,個數為availableSequence-nextSequence + 1 // nextSequence == availableSequence表示該批次只有一個event while (nextSequence <= availableSequence) { // 獲取nextSequence位置上的event event = dataProvider.get(nextSequence); // 使用者自定義的event 回撥 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // 設定當前consumer執行緒的消費進度sequence sequence.set(availableSequence); } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{ long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 獲取ringBuffer中可安全讀的最大的sequence number,該資訊存在availableBuffer中的sequence // 在MultiProducerSequencer.publish方法中會設定 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } // 預設consumer阻塞策略 BlockingWaitStrategy public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence) { // 當前ringBuffer的sequence小於sequence,阻塞等待 // event生產之後會喚醒 synchronized (mutex) { while (cursorSequence.get() < sequence) { barrier.checkAlert(); mutex.wait(); } } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; }
從上面的event消費流程來看,消費執行緒會讀取ringBuffer的sequence,然後更新本消費執行緒內的offset(消費進度sequence),如果有多個event的話,那麼就是廣播消費模式了(單consumer執行緒內還是順序消費),如果不想讓event被廣播消費(重複消費),可使用如下方法新增consumer執行緒(WorkHandler是叢集消費,EventHandler是廣播消費):
disruptor.handleEventsWithWorkerPool((WorkHandler<StringEvent>) event -> { System.out.println(Util.threadName() + "onEvent " + event); });
disruptor高效能之道
棄用鎖機制改用CASevent生產流程中獲取並自增sequence時用的就是CAS,獲取之後該sequence對應位置的操作只會在單執行緒,沒有了併發問題。
叢集消費模式下獲取sequence之後也會使用CAS設定為sequence新值,設定本地消費進度,然後再執行獲取event並執行回撥邏輯。
注意,disruptor中較多地方使用了CAS,但並不代表完全沒有了鎖機制,比如預設consumer阻塞策略 BlockingWaitStrategy發揮作用時,consumer消費執行緒就會阻塞,只不過這隻會出現在event生產能力不足是才會存在。如果consumer消費不足,大量event生產導致ringBuffer爆滿,這時event生產執行緒就會輪詢呼叫LockSupport.parkNanos(1),這裡的成本也不容小覷(涉及到執行緒切換損耗)。
避免偽共享引入緩衝行填充偽共享講的是多個CPU時的123級快取的問題,通常,快取是以快取行的方式讀取資料,如果A、B兩個變數被緩衝在同一行之內,那麼對於其中一個的更新會導致另一個緩衝無效,需要從記憶體中讀取,這種無法充分利用快取行的問題就是偽共享。disruptor相關程式碼如下:
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; }使用RingBuffer作為資料儲存容器
ringBuffer是一個環形佇列,本質是一個數組,size為2的冪次方(方便做&操作),資料位置sequence值會和size做&操作得出陣列下標,然後進行資料的讀寫操作(只在同一個執行緒內,無併發問題)。
小結disruptor初衷是為了解決記憶體佇列的延遲問題,作為一個高效能佇列,包括Apache Storm、Camel、Log4j 2在內的很多知名專案都在使用。disruptor的重要機制就是CAS和RingBuffer,藉助於它們兩個實現資料高效的生產和消費。
disruptor多生產者多消費者模式下,因為RingBuffer資料的寫入是分為2步的(先獲取到個sequence,然後寫入資料),如果獲取到sequence之後,生產者寫入RingBuffer較慢,consumer消費較快,那麼生產者最終會拖慢consumer消費進度,這一點需注意(如果已經消費到生產者佔位的前一個數據了,那麼consumer會執行對應的阻塞策略)。在實際使用過程中,如果consumer消費邏輯耗時較長,可以封裝成任務交給執行緒池來處理,避免consumer端拖慢生成者的寫入速度。
disruptor的設計對於開發者來說有哪些借鑑的呢?儘量減少競爭,避免多執行緒對同一資料做操作,比如disruptor使用CAS獲取只會在一個執行緒內進行讀寫的event物件,這種思想其實已經在JDK的thread本地記憶體中有所體現;儘量複用物件,避免大量的記憶體申請釋放,增加GC損耗,disruptor通過複用event物件來保證讀寫時不會產生物件GC問題;選擇合適資料結構,disruptor使用ringBuffer,環形陣列來實現資料高效讀寫。
參考資料: