1. 程式人生 > >從構建分散式秒殺系統聊聊Disruptor高效能佇列

從構建分散式秒殺系統聊聊Disruptor高效能佇列

前言

秒殺架構持續優化中,基於自身認知不足之處在所難免,也請大家指正,共同進步。文章標題來自碼友的建議,希望可以把阻塞佇列ArrayBlockingQueue這個佇列替換成Disruptor,由於之前曾接觸過這個東西,聽說很不錯,正好藉此機會整合進來。

簡介

LMAX Disruptor是一個高效能的執行緒間訊息庫。它源於LMAX對併發性,效能和非阻塞演算法的研究,如今構成了Exchange基礎架構的核心部分。

Disruptor它是一個開源的併發框架,並獲得2011 Duke’s 程式框架創新獎,能夠在無鎖的情況下實現網路的Queue併發操作。

Disruptor是一個高效能的非同步處理框架,或者可以認為是最快的訊息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。

在這裡你可以跟BlockingQueue佇列作比對,簡單的理解為它是一種高效的"生產者-消費者"模型,先了解後深入底層原理。

核心

寫程式碼案例之前,大家最好先了解 Disruptor 的核心概念,至少知道它是如何運作的。

Ring Buffer

如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的物件,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的資料(事件)進行儲存和更新。在一些更高階的應用場景中,Ring Buffer 可以由使用者的自定義實現來完全替代。

Sequence Disruptor

通過順序遞增的序號來編號管理通過其進行交換的資料(事件),對資料(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用於標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU快取偽共享(Flase Sharing)問題。

Sequencer 

Sequencer 是 Disruptor 的真正核心。此介面有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞資料的併發演算法。

Sequence Barrier

用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。

Wait Strategy

定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的效能表現)

Event

在 Disruptor 的語義中,生產者和消費者之間進行交換的資料被稱為事件(Event)。它不是一個被 Disruptor 定義的特定型別,而是由 Disruptor 的使用者定義並指定。

EventProcessor

EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於呼叫事件處理實現的事件迴圈(Event Loop)。

EventHandler

Disruptor 定義的事件處理介面,由使用者實現,用於處理事件,是 Consumer 的真正實現。

Producer

即生產者,只是泛指呼叫 Disruptor 釋出事件的使用者程式碼,Disruptor 沒有定義特定介面或型別。

優點

剖析Disruptor:為什麼會這麼快?(一)鎖的缺點

剖析Disruptor:為什麼會這麼快?(二)神奇的快取行填充

剖析Disruptor:為什麼會這麼快?(三)偽共享

剖析Disruptor:為什麼會這麼快?(四)揭祕記憶體屏障

使用案例

這裡以我們系統中的秒殺作為案例,後面有相對複雜的場景介紹。

定義秒殺事件物件:

/**

* 事件物件(秒殺事件)

* 建立者 科幫網

*/publicclassSeckillEventimplementsSerializable{privatestaticfinallongserialVersionUID =1L;privatelongseckillId;privatelonguserId;publicSeckillEvent(){    }publiclonggetSeckillId(){returnseckillId;    }publicvoidsetSeckillId(longseckillId){this.seckillId = seckillId;    }publiclonggetUserId(){returnuserId;    }publicvoidsetUserId(longuserId){this.userId = userId;    }}

為了讓Disruptor為我們預先分配這些事件,我們需要一個將執行構造的EventFactory:

/**

* 事件生成工廠(用來初始化預分配事件物件)

* 建立者 科幫網

*/publicclassSeckillEventFactoryimplementsEventFactory{publicSeckillEventnewInstance(){returnnewSeckillEvent();    }}

然後,我們需要建立一個處理這些事件的消費者:

/**

* 消費者(秒殺處理器)

* 建立者 科幫網

*/publicclassSeckillEventConsumerimplementsEventHandler{//業務處理、這裡是無法注入的,需要手動獲取,見原始碼privateISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");publicvoidonEvent(SeckillEvent seckillEvent,longseq,booleanbool)throwsException{        seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());    }}

既然有消費者,我們將需要這些秒殺事件的來源:

/**

* 使用translator方式生產者

* 建立者 科幫網

*/publicclassSeckillEventProducer{privatefinalstaticEventTranslatorVararg translator =newEventTranslatorVararg() {publicvoidtranslateTo(SeckillEvent seckillEvent,longseq, Object... objs){            seckillEvent.setSeckillId((Long) objs[0]);            seckillEvent.setUserId((Long) objs[1]);        }    };privatefinalRingBuffer ringBuffer;publicSeckillEventProducer(RingBuffer ringBuffer){this.ringBuffer = ringBuffer;    }publicvoidseckill(longseckillId,longuserId){this.ringBuffer.publishEvent(translator, seckillId, userId);    }}

最後,我們來寫一個測試類,執行一下(跑不通,需要修改消費者):

/**

* 測試類

* 建立者 科幫網

*/publicclassSeckillEventMain{publicstaticvoidmain(String[] args){        producerWithTranslator();    }publicstaticvoidproducerWithTranslator(){        SeckillEventFactory factory =newSeckillEventFactory();intringBufferSize =1024;        ThreadFactory threadFactory =newThreadFactory() {publicThreadnewThread(Runnable runnable){returnnewThread(runnable);            }        };//建立disruptorDisruptor disruptor =newDisruptor(factory, ringBufferSize, threadFactory);//連線消費事件方法disruptor.handleEventsWith(newSeckillEventConsumer());//啟動disruptor.start();        RingBuffer ringBuffer = disruptor.getRingBuffer();        SeckillEventProducer producer =newSeckillEventProducer(ringBuffer);for(longi =0; i<10; i++){            producer.seckill(i, i);        }        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;}}

使用場景

PCP (生產者-消費者問題)

網上搜了下國內實戰案例並不多,大廠可能有在使用

這裡舉一個大家日常的例子,停車場景。當汽車進入停車場時(A),系統首先會記錄汽車資訊(B)。同時也會發送訊息到其他系統處理相關業務(C),最後傳送簡訊通知車主收費開始(D)。

一個生產者A與三個消費者B、C、D,D的事件處理需要B與C先完成。則該模型結構如下:

在這個結構下,每個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。

1、具有1-5工作經驗的,面對目前流行的技術不知從何下手,

需要突破技術瓶頸的可以加。

2、在公司待久了,過得很安逸,

但跳槽時面試碰壁。

需要在短時間內進修、跳槽拿高薪的可以加。

3、如果沒有工作經驗,但基礎非常紮實,對java工作機制,

常用設計思想,常用java開發框架掌握熟練的,可以加。

4、覺得自己很牛B,一般需求都能搞定。

但是所學的知識點沒有系統化,很難在技術領域繼續突破的可以加。

5. 群號:高階架構群 Java進階群:180705916.備註好資訊!送架構視訊。

6.阿里Java高階大牛直播講解知識點,分享知識,

多年工作經驗的梳理和總結,帶著大家全面、