使用Disruptor完成多執行緒下併發、等待、先後等操作
Java完成多執行緒間的等待功能:
場景1:一個執行緒等待其他多個執行緒都完成後,再進行下一步操作(如裁判員計分功能,需要等待所有運動員都跑完後,才去統計分數。裁判員和每個運動員都是一個執行緒)。
場景2:多個執行緒都等待至某個狀態後,再同時執行(模擬併發操作,啟動100個執行緒 ,先啟動完的需要等待其他未啟動的,然後100個全部啟動完畢後,再一起做某個操作)。
以上兩個場景都較為常見,Java已經為上面的場景1和2分別提供了CountDownLatch和CyclicBarrier兩個實現類來完成,參考另一篇文章:https://blog.csdn.net/tianyaleixiaowu/article/details/75234600
而對於更復雜的場景,如
譬如希望1執行完後才執行2,3執行完後才執行4,1和3並行執行,2和4都執行完後才執行last。
還有其他的更奇怪的執行順序等等。當然這些也可以通過組合多個CountDownLatch或者CyclicBarrier、甚至使用wait、Lock等組合來實現。不可避免的是,都需要使用大量的鎖,直接導致效能的急劇下降和多執行緒死鎖等問題發生。那麼有沒有高效能的無鎖的方式來完成這種複雜的需求實現呢?
那就是Disruptor!
Disruptor可以非常簡單的完成這種複雜的多執行緒併發、等待、先後執行等。
至於Disruptor是什麼就不說了,直接來看使用:
直接新增依賴包,別的什麼都不需要。
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.1</version>
</dependency>
我只帖關鍵程式碼,別的上傳到壓縮包了。https://download.csdn.net/download/tianyaleixiaowu/10322342
package b; import a.FirstEventHandler; import a.LongEvent; import a.LongEventFactory; import a.LongEventProducer; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.nio.ByteBuffer; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * @author wuweifeng wrote on 2018/3/29. */ public class Simple { public static void main(String[] args) { ThreadFactory producerFactory = Executors.defaultThreadFactory(); LongEventFactory eventFactory = new LongEventFactory(); int bufferSize = 8; Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, producerFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); FirstEventHandler firstEventHandler = new FirstEventHandler(); SecondEventHandler secondHandler = new SecondEventHandler(); ThirdEventHandler thirdEventHandler = new ThirdEventHandler(); FourthEventHandler fourthEventHandler = new FourthEventHandler(); LastEventHandler lastEventHandler = new LastEventHandler(); //1,2,last順序執行 //disruptor.handleEventsWith(new LongEventHandler()).handleEventsWith(new SecondEventHandler()) // .handleEventsWith(new LastEventHandler()); //也是1,2,last順序執行 //disruptor.handleEventsWith(firstEventHandler); //disruptor.after(firstEventHandler).handleEventsWith(secondHandler).then(lastEventHandler); //1,2併發執行,之後才是last //disruptor.handleEventsWith(firstEventHandler, secondHandler); //disruptor.after(firstEventHandler, secondHandler).handleEventsWith(lastEventHandler); //1後2,3後4,1和3併發,2和4都結束後last disruptor.handleEventsWith(firstEventHandler, thirdEventHandler); disruptor.after(firstEventHandler).handleEventsWith(secondHandler); disruptor.after(thirdEventHandler).handleEventsWith(fourthEventHandler); disruptor.after(secondHandler, fourthEventHandler).handleEventsWith(lastEventHandler); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer longEventProducer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 10L; i++) { bb.putLong(0, i); longEventProducer.onData(bb); } disruptor.shutdown(); } }
主要就是這個類。我註釋掉的部分分別為順序執行、和12併發然後執行last。
上面那個圖對應的程式碼主要就是after的使用。
執行結果 :
可以看到,由於buffer為8,所以在一個週期內,最大value=7.順序就是1-3,2-4,1、3是併發的。對於同一個Event,2和4執行完後才執行last。多執行幾次看看,就能看明白。
這裡用的producer只打印了10個,可以調大,結果就會隨機性更好。