Disruptor 高效能併發框架二次封裝
Disruptor是一款java高效能無鎖併發處理框架。和JDK中的BlockingQueue有相似處,但是它的處理速度非常快!!!號稱“一個執行緒一秒鐘可以處理600W個訂單”(反正渣渣電腦是沒體會到)。
Disruptor功能十分強大,比如消費者阻塞等待;生產者-消費者一對一、一對多、多對一、多對多;構建消費者串/並行處理鏈等等。
具體的概念模型可以參考:https://www.cnblogs.com/haiq/p/4112689.html
下面是我基於Disruptor框架封裝的工具。採用fluent編碼風格,簡化了Disruptor的呼叫。
package com.gravel.demo.test.disruptor; import com.gravel.demo.test.disruptor.base.EventProducer; import com.gravel.demo.test.disruptor.base.Publisher; import com.gravel.demo.test.disruptor.base.PublisherBuilder; /** * @Auther: syh * @Date: 2020/7/8 * @Description: Disruptor 高效能非同步處理框架 */ public class DisruptorTest { public static void main(String[] args) throws Exception { builderTest(); } private static void builderTest() throws Exception { // 建立多個生產者例項 EventProducer<Domain> producer1 = new EventProducer<>("producer1"); EventProducer<Domain> producer2 = new EventProducer<>("producer2"); // 建立多個消費者例項 DomainConsumer handler1 = new DomainConsumer("handler1"); DomainConsumer handler2 = new DomainConsumer("handler2"); DomainConsumer after1 = new DomainConsumer("after1"); DomainConsumer after2 = new DomainConsumer("after2"); DomainConsumer after3 = new DomainConsumer("after3"); DomainConsumer then = new DomainConsumer("then"); // 建立訊息釋出者 final Publisher<Domain> publisher = PublisherBuilder.newBuilder() // 設定執行緒工廠 // .threadFactory(r -> new Thread(r)) // .threadFactory(new LimitedThreadFactory()) // .threadFactory(Executors.defaultThreadFactory()) // 設定生產型別 // .producerType(ProducerType.SINGLE) // .producerType(ProducerType.MULTI) // 設定事件工廠 // .eventFactory(new EventFactory()) // 設定等待策略 // .waitStrategy(new SleepingWaitStrategy()) // .waitStrategy(new YieldingWaitStrategy()) // 設定釋出方式 // .publishStrategy(PublishStrategy.TRANSLATOR) // .publishStrategy(PublishStrategy.NORMAL) // 設定ringBuffer大小 // .ringSize(1024 * 8) // 設定異常處理器 .exceptionHandler(new DomainErrorHandler<>()) // 初始化Disruptor, 在配置生產者和消費者之前一定要先初始化。 .disruptor() // 設定單生產者 // .producer(producer1) // 配置單個消費者 // .handler(handler1) // 配置多生產者 .producer(producer1, producer2) // ====== 設定多個workers或者handlers處理鏈 start ======= // .worker(handler1) .handler(handler1, handler2) .after(handler1).handler(after1) .after(handler2).handler(after2) .after(after1, after2).handler(after3) // .then(after3) // ====== 設定多個workers或者handlers處理鏈 end ======= // 啟動 .build(); long start = System.currentTimeMillis(); try { for (int i = 0; i < 500; i++) { publisher // 可連續釋出 //.publish(new Domain(String.valueOf("a" + i), "init")) .publish(new Domain(String.valueOf(i), "init")); } } finally { long sleep = 200; Thread.sleep(sleep); System.out.println("used time: " + (System.currentTimeMillis() - start - sleep) + "ms"); // 不關閉會一直阻塞等待 publisher.shutdown(); } } }
從上面的程式碼來看,我們封裝的工具類入口是Publisher。他可以配置一系列Disruptor需要的引數,如執行緒工廠(ThreadFactory)、事件工廠(EventFactory)、等待策略(WaitStrategy)、訊息生產者(Producer)、消費者(Handler/Worker)等等。
其中訊息生產者和消費者是Publisher的關鍵,所以稍後重點描述。先看看其他PublisherBuilder類。
package com.gravel.demo.test.disruptor.base; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public class PublisherBuilder { // Publish 預設引數 private static final int RING_SIZE = 1024 * 8; private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory(); private static final WaitStrategy WAIT_STRATEGY = new SleepingWaitStrategy(); private static final com.lmax.disruptor.EventFactory EVENT_FACTORY = new EventFactory(); private static final ProducerType PRODUCER_TYPE = ProducerType.SINGLE; private static final PublishStrategy PUBLISH_STRATEGY = PublishStrategy.TRANSLATOR; private com.lmax.disruptor.EventFactory eventFactory; private ThreadFactory threadFactory; private WaitStrategy waitStrategy; private ExceptionHandler exceptionHandler; private ProducerType type; private PublishStrategy publishStrategy; private EventPublisher publisher; private int ringSize; public static PublisherBuilder newBuilder() { return new PublisherBuilder(); } /** * 指定ringBuffer size,最好為2的n次方。預設1024*8 * * @param ringSize * @return */ public PublisherBuilder ringSize(int ringSize) { this.ringSize = ringSize; return this; } /** * 指定eventFactory, 預設EventFactory * * @param eventFactory * @param <T> * @return */ public <T> PublisherBuilder eventFactory(com.lmax.disruptor.EventFactory eventFactory) { this.eventFactory = eventFactory; return this; } /** * 指定ThreadFactory, 預設Executors.defaultThreadFactory() * * @param threadFactory * @return */ public PublisherBuilder threadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; return this; } /** * 指定等待策略, 預設SleepingWaitStrategy * * @param waitStrategy * @return */ public PublisherBuilder waitStrategy(WaitStrategy waitStrategy) { this.waitStrategy = waitStrategy; return this; } public PublisherBuilder publishStrategy(PublishStrategy publishStrategy) { this.publishStrategy = publishStrategy; return this; } /** * 初始化disruptor * * @return */ public PublisherBuilder disruptor() { this.eventFactory = this.eventFactory == null ? EVENT_FACTORY : this.eventFactory; this.threadFactory = this.threadFactory == null ? THREAD_FACTORY : this.threadFactory; this.waitStrategy = this.waitStrategy == null ? WAIT_STRATEGY : this.waitStrategy; this.ringSize = this.ringSize <= 0 ? RING_SIZE : this.ringSize; this.type = this.type == null ? PRODUCER_TYPE : this.type; this.publishStrategy = this.publishStrategy == null ? PUBLISH_STRATEGY : this.publishStrategy; publisher = new EventPublisher<>(eventFactory, ringSize, threadFactory, waitStrategy, exceptionHandler, type, publishStrategy); return this; } /** * 配置生產者 * @param producers * @param <T> * @return */ public <T> PublisherBuilder producer(EventProducer<T> ...producers) { if (isInit()) { this.publisher.producer(producers); } return this; } /** * eventHandler:每個event事件可以被所有handler處理 * * @param eventHandlers * @param <T> * @return */ public <T> PublisherBuilder handler(EventHandler<Event<T>>... eventHandlers) { if (isInit()) { this.publisher.eventHandler(eventHandlers); } return this; } /** * workHandler:每個event事件只能被一個work處理 * * @param workHandlers * @param <T> * @return */ public <T> PublisherBuilder worker(WorkHandler<Event<T>>... workHandlers) { if (isInit()) { this.publisher.workHandler(workHandlers); } return this; } /** * handler或work之後處理 * * @param thenEventHandlers * @param <T> * @return */ public <T> PublisherBuilder then(EventHandler<Event<T>>... thenEventHandlers) { if (isInit()) { this.publisher.thenHandler(thenEventHandlers); } return this; } /** * 順序指定handler,不可接直接在worker方法後面呼叫 * * @param afterEventHandlers * @param <T> * @return */ public <T> PublisherBuilder after(EventHandler<Event<T>>... afterEventHandlers) { if (isInit()) { this.publisher.afterHandler(afterEventHandlers); } return this; } /** * 指定producerType * * @param type * @param <T> * @return */ public <T> PublisherBuilder producerType(ProducerType type) { this.type = type; return this; } /** * 異常處理類 * * @param exceptionHandler * @param <T> * @return */ public <T> PublisherBuilder exceptionHandler(ExceptionHandler<T> exceptionHandler) { this.exceptionHandler = exceptionHandler; return this; } private boolean isInit() { if (this.publisher == null) { throw new IllegalStateException("execute disruptor() function before set handlers or workers."); } return true; } public <T> Publisher<T> build() { return this.publisher.start(); } }
其中關鍵程式碼是disruptor()方法,即Disruptor例項化入口。因為eventHandler和workHandler的指定必須在disruptor例項之後,disruptor.start()啟動之前。
所以我們呼叫工具類必須如下:
PublisherBuilder.newBuilder()/*.各種配置*/.disruptor()/*指定producer和handler/worker*/.build();
來看看PublisherBuilder中的EventPulisher類, 他實現了Publisher介面, 如下:
Publisher.java
package com.gravel.demo.test.disruptor.base; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public interface Publisher<T> { Publisher<T> start(); Publisher<T> publish(T t); Publisher<T> shutdown(); }
EventPublisher.java
package com.gravel.demo.test.disruptor.base; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import java.util.Objects; import java.util.concurrent.ThreadFactory; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public class EventPublisher<T> implements Publisher<T> { private RingBuffer<Event<T>> ringBuffer; private Disruptor<Event<T>> disruptor; private EventHandlerGroup<Event<T>> handlerGroup; private PublisherState state; private PublishStrategy publishStrategy; private EventTranslator<T> translator; private Producer<T>[] producers; private enum PublisherState { START, SHUTDOWN } public EventPublisher(com.lmax.disruptor.EventFactory<Event<T>> factory, int ringSize, ThreadFactory threadFactory, WaitStrategy waitStrategy, ExceptionHandler<Event<T>> exceptionHandler, ProducerType type, PublishStrategy publishStrategy) { this.disruptor = new Disruptor<>(factory, ringSize, threadFactory, type, waitStrategy); if (!Objects.isNull(exceptionHandler)) { this.disruptor.setDefaultExceptionHandler(exceptionHandler); } this.ringBuffer = disruptor.getRingBuffer(); this.publishStrategy = publishStrategy; if (publishStrategy == PublishStrategy.TRANSLATOR) { translator = new EventTranslator<>(); } this.state = PublisherState.SHUTDOWN; } public EventPublisher<T> producer(EventProducer<T> ...producers) { if (!Objects.isNull(producers) && producers.length > 0) { for (EventProducer<T> producer : producers) { producer.setRingBuffer(this.ringBuffer).setTranslator(this.translator); } this.producers = producers; } return this; } public EventPublisher<T> eventHandler(EventHandler<Event<T>>... eventHandlers) { if (this.handlerGroup != null) { this.handlerGroup.handleEventsWith(eventHandlers); } else { this.handlerGroup = disruptor.handleEventsWith(eventHandlers); } return this; } public EventPublisher<T> workHandler(WorkHandler<Event<T>>... workHandlers) { if (this.handlerGroup != null) { this.handlerGroup.handleEventsWithWorkerPool(workHandlers); } else { this.handlerGroup = disruptor.handleEventsWithWorkerPool(workHandlers); } return this; } public EventPublisher<T> thenHandler(EventHandler<Event<T>>... thenHandlers) { this.handlerGroup.then(thenHandlers); return this; } public EventPublisher<T> afterHandler(EventHandler<Event<T>>... afterHandlers) { this.handlerGroup = this.disruptor.after(afterHandlers); return this; } public Disruptor<Event<T>> getDisruptor() { return disruptor; } @Override public EventPublisher<T> start() { this.disruptor.start(); this.state = PublisherState.START; return this; } @Override public EventPublisher<T> shutdown() { this.disruptor.shutdown(); this.state = PublisherState.SHUTDOWN; return this; } @Override public EventPublisher<T> publish(T t) { if (!isStarted()) { throw new IllegalStateException("publisher not start.."); } if (producers == null || producers.length <= 0) { throw new IllegalStateException("producer must be specify."); } for (Producer<T> producer : producers) { producer.produce(t); } return this; } private boolean isStarted() { return this.state == PublisherState.START; } }
其中,比較重要的程式碼是指定生產者及消費者鏈(!!!賊好用!!!)。動態引數表示生產者和消費者可以是一對一、一對多、多對一、多對多的關係。
先來講講publisher的最小資料單位Event
package com.gravel.demo.test.disruptor.base; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public class Event<T> { private T data; public void set(T data) { this.data = data; } public T get() { return data; } }
再來看看生產者的程式碼, EventProducer繼承自Producer。
Producer.java
package com.gravel.demo.test.disruptor.base; /** * @Auther: syh * @Date: 2020/7/9 * @Description: */ public interface Producer<T> { void produce(T t); }
EventProducer.java
package com.gravel.demo.test.disruptor.base; import com.lmax.disruptor.RingBuffer; /** * @Auther: syh * @Date: 2020/7/9 * @Description: */ public class EventProducer<T> implements Producer<T> { private String name; private RingBuffer<Event<T>> ringBuffer; private EventTranslator<T> translator; public EventProducer(String name) { this.name = name; } public EventProducer<T> setRingBuffer(RingBuffer<Event<T>> ringBuffer) { this.ringBuffer = ringBuffer; return this; } public EventProducer<T> setTranslator(EventTranslator<T> translator) { this.translator = translator; return this; } @Override public void produce(T t) { System.out.println(String.format("producer message by %s, data: %s", name, t)); if (translator != null) { ringBuffer.publishEvent(translator, t); } else { long seq = ringBuffer.next(); try { Event<T> event = ringBuffer.get(seq); event.set(t); } finally { // Disruptor 要求 RingBuffer.publish 必須得到呼叫的潛臺詞就是,如果發生異常也一樣要呼叫 publish // 如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer ringBuffer.publish(seq); } } } }
其中,ringBuffer必須是Disruptor例項的成員。所以在指定producer時必須遍歷設定ringBuffer。translator物件也做單例模式不知道是否會發生執行緒間資料覆蓋問題。如果執行緒不安全,就每個Producer都初始化一個Translator物件。從produce()方法我們可以看見,disruptor是通過ringBuffer釋出訊息的。有兩種釋出方式:一種是通過translator方式,一種是通過sequence方式。注意finally裡面的提示。
來看看EventTranslator的程式碼,繼承自EventTranslatorVararg,translateTo()方法對EventFactory建立的例項做資料填充。
package com.gravel.demo.test.disruptor.base; import com.lmax.disruptor.EventTranslatorVararg; /** * @Auther: syh * @Date: 2020/7/9 * @Description: */ public class EventTranslator<T> implements EventTranslatorVararg<Event<T>> { @Override public void translateTo(Event<T> event, long sequence, Object... args) { event.set(((T)(args[0]))); } }
然後是消費者。為了統一處理worker和handler結果,所有的消費者必須實現Disruptor的EventHandler/WorkHandler。還有一個自定義的Consumer介面。
Consumer介面
package com.gravel.demo.test.disruptor.base; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public interface Consumer<T> { void consume(T data, Boolean over) throws Exception; }
EventConsumer.java
package com.gravel.demo.test.disruptor.base; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public abstract class EventConsumer<T> implements EventHandler<Event<T>>, Consumer<T>, WorkHandler<Event<T>> { String name; public EventConsumer(String name) { this.name = name; } @Override public void onEvent(Event<T> event, long seq, boolean over) throws Exception { consume(event.get(), over); } @Override public void onEvent(Event<T> event) throws Exception { consume(event.get(), null); } protected String getName() { return name; } }
抽象類EventConsumer統一處理worker和handler的onEvent方法。具體的consume操作需要使用者自己實現。如demo裡面的DomainConsumer。
到這裡。比較關鍵的生產者和消費者程式碼都封裝好了。
現在看看非核心程式碼
錯誤處理類:ErrorHandler實現ExceptionHandler
package com.gravel.demo.test.disruptor.base; import com.lmax.disruptor.ExceptionHandler; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public abstract class ErrorHandler<T> implements ExceptionHandler<Event<T>> { @Override public void handleEventException(Throwable throwable, long l, Event<T> t) { handle(t.get(), throwable); } @Override public void handleOnStartException(Throwable throwable) { } @Override public void handleOnShutdownException(Throwable throwable) { } protected abstract void handle(T object, Throwable throwable); }
EventFactory事件工廠類,實現Disruptor的EeventFactory
package com.gravel.demo.test.disruptor.base; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public class EventFactory implements com.lmax.disruptor.EventFactory { @Override public Event newInstance() { return new Event(); } }
ringBuffer傳送方式
package com.gravel.demo.test.disruptor.base; /** * @Auther: syh * @Date: 2020/7/9 * @Description: */ public enum PublishStrategy { NORMAL, TRANSLATOR }
至此,Disruptor的封裝就結束了。放一下demo類
實體Domain
package com.gravel.demo.test.disruptor; /** * @Auther: syh * @Date: 2020/7/9 * @Description: */ public class Domain { private String id; private String value; public Domain(String id, String value) { this.id = id; this.value = value; } public void setValue(String value) { this.value = value; } @Override public String toString() { return "Domain{" + "id='" + id + '\'' + ", value='" + value + '\'' + '}'; } }
DomainConsumer
package com.gravel.demo.test.disruptor; import com.gravel.demo.test.disruptor.base.EventConsumer; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public class DomainConsumer extends EventConsumer<Domain> { public DomainConsumer() { this("FirstDisruptorHandler" + (Math.random() * 100)); } public DomainConsumer(String name) { super(name); } @Override public void consume(Domain data, Boolean over) throws Exception { // errorHandler測試用:worker拋異常,handler正常處理 /*if (Objects.isNull(over)) { throw new RuntimeException(getName() + " handle exception."); }*/ System.out.println(String.format("received by %s, data: %s, is over?%s", getName(), data.toString(), over)); data.setValue(getName()); } }
ConsumerErrorHandler
package com.gravel.demo.test.disruptor; import com.gravel.demo.test.disruptor.base.ErrorHandler; /** * @Auther: syh * @Date: 2020/7/8 * @Description: */ public class DomainErrorHandler<T> extends ErrorHandler<T> { @Override protected void handle(T object, Throwable throwable) { System.err.println(String.format("received a error message: %s, data: %s, ", throwable.getMessage(), object)); // 不拋異常,則資料會繼續流轉到下一個handler throw new IllegalStateException("interrupted."); } }
LimitThreadFactory
package com.gravel.demo.test.disruptor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * @Auther: syh * @Date: 2020/7/8 * @Description: 單執行緒 */ public class LimitedThreadFactory implements ThreadFactory { private final AtomicInteger count = new AtomicInteger(0); public Thread newThread(Runnable r) { if (count.compareAndSet(0, 1)) { return new Thread(r); } else { throw new IllegalStateException("Created more that one thread"); } } }
demo測試(只測一條消費資料)
單生產單消費(worker和handler呼叫結果一致)
.producer(producer1) .handler(handler1)
呼叫結果
producer message by producer1, data: Domain{id='0', value='init'} received by handler1, data: Domain{id='0', value='init'}, is over?true used time: 19ms
單生產者多消費者(hanlder型別,設定菱形呼叫鏈。)。
.producer(producer1) .handler(handler1, handler2) .after(handler1).handler(after1) .after(handler2).handler(after2) .after(after1, after2).handler(after3)
呼叫結果(event資料會被每個handler都消費。)
producer message by producer1, data: Domain{id='0', value='init'} received by handler2, data: Domain{id='0', value='init'}, is over?true received by handler1, data: Domain{id='0', value='init'}, is over?true received by after2, data: Domain{id='0', value='handler1'}, is over?true received by after1, data: Domain{id='0', value='handler1'}, is over?true received by after3, data: Domain{id='0', value='after1'}, is over?true used time: 19ms
單執行緒 ,單生產者多消費者(worker型別,work型別不能直接設定after。)。
.producer(producer1) .worker(handler1, handler2)
呼叫結果(對比handler,可以看出event資料只會被一個worker消費。)
producer message by producer1, data: Domain{id='0', value='init'} received by handler1, data: Domain{id='0', value='init'}, is over?null used time: 21ms
多生產單消費(work和handler一致)
.handler(handler1) .producer(producer1, producer2)
執行結果(資料被改寫了)
producer message by producer1, data: Domain{id='0', value='init'} producer message by producer2, data: Domain{id='0', value='init'} received by handler1, data: Domain{id='0', value='init'}, is over?true received by handler1, data: Domain{id='0', value='handler1'}, is over?true used time: 18ms
多生產多消費(worker和handler混合用)
.producer(producer1, producer2) .worker(after1, after2) .handler(handler1, handler2)
呼叫結果
producer message by producer1, data: Domain{id='0', value='init'} producer message by producer2, data: Domain{id='0', value='init'} received by after2, data: Domain{id='0', value='init'}, is over?null received by after1, data: Domain{id='0', value='after2'}, is over?null received by handler2, data: Domain{id='0', value='after1'}, is over?false received by handler1, data: Domain{id='0', value='after1'}, is over?false received by handler2, data: Domain{id='0', value='handler2'}, is over?true received by handler1, data: Domain{id='0', value='handler1'}, is over?true used time: 26ms