1. 程式人生 > >網際網路架構(6):併發程式設計--Disruptor併發框架

網際網路架構(6):併發程式設計--Disruptor併發框架

6 Disruptor併發框架簡介

Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個執行緒裡每秒處理6百萬訂單。業務邏輯處理器完全是執行在記憶體中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。

Disruptor它是一個開源的併發框架,並獲得2011 Duke’s 程式框架創新獎,能夠在無鎖的情況下實現網路的Queue併發操作。

Disruptor是一個高效能的非同步處理框架,或者可以認為是最快的訊息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。

目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本效能更加的優秀,提供更多的API使用方式。

下載disruptor-3.3.2.jar引入我們的專案既可以開始disruptor之旅。

在使用之前,首先說明disruptor主要功能加以說明,你可以理解為他是一種高效的”生產者-消費者”模型。也就效能遠遠高於傳統的BlockingQueue容器。

(1)使用Disruptor
  • 第一:建立一個Event類,用來承載資料,因為Disruptor是一個事件驅動的,所以再Disruptor中是以事件繫結資料進行傳遞的
  • 第二:建立一個工廠Event類,用於建立Event類例項物件
  • 第三:需要有一個監聽事件類,用於處理資料(Event類)
  • 第四:我們需要進行測試程式碼編寫。例項化Disruptor例項,配置一系列引數。然後我們對Disruptor例項繫結監聽事件類,接受並處理資料。
  • 第五:在Disruptor中,真正儲存資料的核心叫做RingBuffer,我們通過Disruptor例項拿到它,然後把資料生產出來,把資料加入到RingBuffer的例項物件中即可。

    例項化一個Disruptor物件:
    //建立Disruptor
    //1 eventFactory 為
    //2 ringBufferSize為RingBuffer緩衝區大小,最好是2的指數倍 
    //3 執行緒池,進行Disruptor內部的資料接收處理呼叫
    //4 第四個引數ProducerType.SINGLE和ProducerType.MULTI,用來指定資料生成者有一個還是多個
    //5 第五個引數是一種策略:WaitStrategy
    /**
     * 建立Disruptor
     * @param eventFactory 工廠類物件,用於建立一個個的LongEvent, LongEvent是實際的消費資料,初始化啟動Disruptor的時候,Disruptor會呼叫該工廠方法建立一個個的消費資料例項存放到RingBuffer緩衝區裡面去,建立的物件個數為ringBufferSize指定的
     * @param ringBufferSize RingBuffer緩衝區大小
     * @param executor 執行緒池,Disruptor內部的對資料進行接收處理時呼叫
     * @param producerType 用來指定資料生成者有一個還是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI
     * @param waitStrategy 一種策略,用來均衡資料生產者和消費者之間的處理效率,預設提供了3個實現類
     */
    com.lmax.disruptor.dsl.Disruptor.Disruptor<V>(EventFactory<V> eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, WaitStrategy waitStrategy)
    
    //BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    
    //SleepingWaitStrategy 的效能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    
    //YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會迴圈等待sequence增加到合適的值。迴圈中呼叫Thread.yield()允許其他準備好的執行緒執行。如果需要高效能而且事件消費者執行緒比邏輯核心少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超執行緒的時候。
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    
    //BusySpinWaitStrategy是效能最高的等待策略,同時也是對部署環境要求最高的策略。這個效能最好用在事件處理執行緒比物理核心數目還要小的時候。例如:在禁用超執行緒技術的時候。
    WaitStrategy BusySpin_WAIT = new BusySpinWaitStrategy();
    
    //連線消費事件方法,其中EventHandler的是為消費者消費訊息的實現類
    disruptor.handleEventsWith(? extends EventHandler<V>);
    
    //通過例項化的Disruptor物件獲取到RingBuffer緩衝區,然後往緩衝區裡面新增資料並且釋出,消費者就可以消費這個資料了
    RingBuffer<V> ringBuffer = disruptor.getRingBuffer();//獲取資料緩衝區
    
    long sequence = ringBuffer.next();//從資料緩衝區中獲取下一個可用事件槽的Id
    
    V event = ringBuffer.get(sequence); //從事件槽中獲取一個數據物件(初始化的時候,槽就會生成對應的物件V放到RingBuffer裡面,就是eventFactory返回的物件)
    
    event.setValue(bbf.getLong(0));//呼叫Event的方法,設定資料,注意Event完全由使用者實現
    
    ringBuffer.publish(sequence);//釋出事件,釋出的是RingBuffer的事件槽的Id,消費者也是根據這個Id去RingBuffer中獲取對應的事件資料的,另外ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。
    
(2)Disruptor術語
  • RingBuffer:被看做Disruptor最主要的元件,然而從3.0開始RingBuffer僅僅負責儲存和更新再Disruptor中流通的資料。對一些特殊的使用場景能夠被使用者(使用其他資料結構)完全替代。

  • Sequence:Disruptor使用Sequence來表示一個特殊元件處理的序號。和Disruptor一樣,每一個消費者(EventProcessor)都維持著一個Sequence。大部分的併發程式碼依賴這些Sequence值得運轉,因此Sequence支援多種當前為AtomicLong類的特性。

  • Sequencer:這是Disruptor真正的核心。實現了這個介面的兩種生產者(單生產者和多生產者)均實現了所有的併發演算法,為了在生產者和消費者之間進行準確快速的資料傳遞。

  • SequenceBarrier:由Sequencer生成,並且包含了已經發布的Sequence的引用,這些Sequence源於Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裡面獲取事件時的處理策略。(例如:當生產者太慢,消費者太快,會導致消費者獲取不到新的事件會根據該策略進行處理,預設會堵塞)

  • WaitStrategy:決定一個消費者將如何等待生產者將Event置入Disruptor的策略。用來權衡當生產者無法將新的事件放進RingBuffer時的處理策略。(例如:當生產者太快,消費者太慢,會導致生成者獲取不到新的事件槽來插入新事件,則會根據該策略進行處理,預設會堵塞)

  • Event:從生產者到消費者過程中所處理的資料單元。Disruptor中沒有程式碼表示Event,因為它完全是由使用者定義的。

  • EventProcessor:主要事件迴圈,處理Disruptor中的Event,並且擁有消費者的Sequence。它有一個實現類是BatchEventProcessor,包含了event loop有效的實現,並且將回調到一個EventHandler介面的實現物件。

  • EventHandler:由使用者實現並且代表了Disruptor中的一個消費者的介面。

  • Producer:由使用者實現,它呼叫RingBuffer來插入事件(Event),在Disruptor中沒有相應的實現程式碼,由使用者實現。

  • WorkProcessor:確保每個sequence只被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。

  • WorkerPool:一個WorkProcessor池,其中WorkProcessor將消費Sequence,所以任務可以在實現WorkHandler介面的worker之間移交

  • LifecycleAware:當BatchEventProcessor啟動和停止時,於實現這個介面用於接收通知。

(3)理解RingBuffer
  • ringbuffer到底是什麼?

    答:嗯,正如名字所說的一樣,它是一個環(首尾相接的環),你可以把它用做在不同上下文(執行緒)間傳遞資料的buffer。

  • 基本來說,ringbuffer擁有一個序號,這個序號指向陣列中下一個可用元素。

Disruptor說的是生產者和消費者的故事. 有一個數組.生產者往裡面扔芝麻.消費者從裡面撿芝麻. 但是扔芝麻和撿芝麻也要考慮速度的問題. 1 消費者撿的比扔的快 那麼消費者要停下來.生產者扔了新的芝麻,然後消費者繼續. 2 陣列的長度是有限的,生產者到末尾的時候會再從陣列的開始位置繼續.這個時候可能會追上消費者,消費者還沒從那個地方撿走芝麻,這個時候生產者要等待消費者撿走芝麻,然後繼續.

  • 隨著你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增長,直到繞過這個環。

  • 要找到陣列中當前序號指向的元素,可以通過mod操作:Sequence mod Array.length = index in Array(取模操作)假如當前的Sequence為12,RingBuffer的長度為10,那麼下一個事件槽的ID就為(java的mod語法):12 % 10 = 2。很簡單吧。由於是取模操作,所以如果槽的個數是2的N次方那麼將更有利於基於二進位制的計算機進行計算。

(4)RingBuffer的特點
  • 如果你看了維基百科裡面的關於環形buffer的詞條,你就會發現,我們的實現方式,與其最大的區別在於:沒有尾指標。我們只維護了一個指向下一個可用位置的序號。這種實現是經過深思熟慮的—我們選擇用環形buffer的最初原因就是想要提供可靠的訊息傳遞。
  • 我們實現的ring buffer和大家常用的佇列之間的區別是,我們不刪除buffer中的資料,也就是說這些資料一直存放在buffer中,直到新的資料覆蓋他們。這就是和維基百科版本相比,我們不需要尾指標的原因。ringbuffer本身並不控制是否需要重疊。
  • 因為它是陣列,所以要比連結串列快,而且有一個容易預測的訪問模式。
  • 這是對CPU快取友好的,也就是說在硬體級別,陣列中的元素是會被預載入的,因此在ringbuffer當中,cpu無需時不時去主存載入陣列中的下一個元素。
  • 其次,你可以為陣列預先分配記憶體,使得陣列物件一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。此外,不像連結串列那樣,需要為每一個新增到其上面的物件創造節點物件—對應的,當刪除節點時,需要執行相應的記憶體清理操作。
(5)Disruptor應用

Disruptor實際上是對RingBuffer的封裝,所以我們也可以直接使用RingBuffer類

  • API提供的生產者介面 EventTranslator<V>與EventTranslatorOneArg<V v, Object data>,前者不能動態傳參,後者可以動態傳遞一個引數data,V為需要建立的資料物件,data為實際資料,實現translateTo(V v, long sequeue, Object data)方法,其中v就是下一個可用事件槽裡面的物件,data為傳進來的真實資料,呼叫ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);來發布資料到RingBuffer中

    import java.nio.ByteBuffer;
    
    import com.lmax.disruptor.EventTranslatorOneArg;
    import com.lmax.disruptor.RingBuffer;
    
    /**
     * Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,
     * 所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來發布事件
     */
    public class LongEventProducerWithTranslator {
    
        //一個translator可以看做一個事件初始化器,publicEvent方法會呼叫它
        //填充Event
        private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                    @Override
                    public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
                        event.setValue(buffer.getLong(0));
                    }
        };
    
        private final RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(ByteBuffer buffer){
            ringBuffer.publishEvent(TRANSLATOR, buffer);
        }
    
    }
    
  • API提供的消費者介面

    • WorkerPool : WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)其中RingBuffer為資料緩衝區,sequenceBarrier是消費者與生產者之間的協調策略,API預設提供了一個實現類ProcessingSequenceBarrier,可以通過RingBuffer.newBarrier(Sequence... sequencesToTrack);來獲取,exceptionHandler為異常處理函式,當handler發生異常時回撥該函式,workHandlers為實現了EventHandler介面的訊息業務處理類,可以有多個。
      WorkerPool啟動的方法WorkerPool.start(Executor executor)

    • BatchEventProcessor : BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler) 其中RingBuffer為資料緩衝區,sequenceBarrier是消費者與生產者之間的協調策略,API預設提供了一個實現類ProcessingSequenceBarrier,可以通過RingBuffer.newBarrier(Sequence... sequencesToTrack);來獲取,eventHandler為實現了EventHandler介面的訊息業務處理類。
      BatchEventProcessor啟動的方法Executor.submit(BatchEventProcessor batchEventProcessor)

**注意**SequenceBarrier是用來協調消費者和生成者之間效率的策略類,所以要想Barrier生效,必須要將消費者消費的Seuence傳遞給RingBuffer,然後由RingBuffer進行協調:ringBuffer.addGatingSequences(BatchEventProcessor.getSequence()); 多消費者時使用BatchEventProcessor.getWorkerSequences()(這兩個方法WorkerPool同樣適用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行呼叫,在呼叫handleEventsWith註冊消費者方法時會自動新增該處理。

-Trade.java

    import java.util.concurrent.atomic.AtomicInteger;

    public class Trade {  

        private String id;//ID  
        private String name;
        private double price;//金額  
        private AtomicInteger count = new AtomicInteger(0);

        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
        public AtomicInteger getCount() {
            return count;
        }
        public void setCount(AtomicInteger count) {
            this.count = count;
        } 


    }  
  • TradeHandler.java

    import java.util.UUID;
    
    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    
    /**
     * 實現EventHandler是為了作為BatchEventProcessor的事件處理器,
     * 實現WorkHandler是為了作為WorkerPool的事件處理器
     * @author jliu10
     *
     */
    public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
    
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            this.onEvent(event);  
        }  
    
        @Override  
        public void onEvent(Trade event) throws Exception {  
            //這裡做具體的消費邏輯  
            event.setId(UUID.randomUUID().toString());//簡單生成下ID  
            System.out.println(event.getId());  
        }  
    }  
    
  • Main1.java

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.lmax.disruptor.BatchEventProcessor;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.YieldingWaitStrategy;
    
    public class Main1 {  
    
        public static void main(String[] args) throws Exception {  
            int BUFFER_SIZE=1024;  
            int THREAD_NUMBERS=4;  
            /* 
             * createSingleProducer建立一個單生產者的RingBuffer, 
             * 第一個引數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生資料填充RingBuffer的區塊。 
             * 第二個引數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 
             * 第三個引數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 
             */  
            final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, BUFFER_SIZE, new YieldingWaitStrategy());  
    
            //建立執行緒池  
            ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    
            //建立SequenceBarrier 用來權衡消費者是否可以從ringbuffer裡面獲取事件
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    
            //建立訊息處理器  
            BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                    ringBuffer, sequenceBarrier, new TradeHandler());  
    
            //這一步的目的就是把消費者的位置資訊引用注入到生產者    如果只有一個消費者的情況可以省略 
            ringBuffer.addGatingSequences(transProcessor.getSequence());  
    
            //把訊息處理器提交到執行緒池  
            executors.submit(transProcessor);  
    
            //如果存在多個消費者 那重複執行上面3行程式碼 把TradeHandler換成其它消費者類  
    
            Future<?> future= executors.submit(new Callable<Void>() {  
                @Override  
                public Void call() throws Exception {  
                    long seq;  
                    for(int i=0;i<10;i++){  
                        seq = ringBuffer.next();//佔個坑 --ringBuffer一個可用區塊  
                        ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 資料 
                        ringBuffer.publish(seq);//釋出這個區塊的資料使handler(consumer)可見  
                    }  
                    return null;  
                }  
            }); 
    
            future.get();//等待生產者結束  
            Thread.sleep(1000);//等上1秒,等消費都處理完成  
            transProcessor.halt();//通知事件(或者說訊息)處理器 可以結束了(並不是馬上結束!!!)  
            executors.shutdown();//終止執行緒  
        }  
    }  
    
  • Main2.java

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.IgnoreExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.WorkHandler;
    import com.lmax.disruptor.WorkerPool;
    
    public class Main2 {  
        public static void main(String[] args) throws InterruptedException {  
            int BUFFER_SIZE=1024;  
            int THREAD_NUMBERS=4;  
    
            EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            };  
    
            RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
    
            SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
    
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
    
            WorkHandler<Trade> handler = new TradeHandler();  
    
            WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
    
            //這一步的目的就是把消費者的位置資訊引用注入到生產者    如果只有一個消費者的情況可以省略 
            ringBuffer.addGatingSequences(workerPool.getWorkerSequences());  
    
            workerPool.start(executor);  
    
            //下面這個生產8個數據
            for(int i=0;i<8;i++){  
                long seq=ringBuffer.next();  
                ringBuffer.get(seq).setPrice(Math.random()*9999);  
                ringBuffer.publish(seq);  
            }  
    
            Thread.sleep(1000);  
            workerPool.halt();  
            executor.shutdown();  
        }  
    }  
    
  • Disruptor註冊消費者的方法是:Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)

    Disruptor提供了一些複雜的並行執行方式。

    • 1、生產者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的訊息處理類必須要實現EventHandler介面)

      EventHandlerGroup<Trade> handlerGroup = 
              disruptor.handleEventsWith(A, B);
      //宣告在C1,C2完事之後執行JMS訊息傳送操作 也就是流程走到C3 
      handlerGroup.then(C);
      
    • 2、生產者A生成的資料同時被B1,C2兩個消費者消費,而B消耗完畢之後由B2處理,C1處理完成之後由C2處理,B2與C2兩者都消費完成之後再由消費者D對兩者同時消費。其中B1與B2,C1與C2是並行執行的。

      disruptor.handleEventsWith(B1, C1);
      disruptor.after(B1).handleEventsWith(B2);
      disruptor.after(C1).handleEventsWith(C2);
      disruptor.after(B2, C2).handleEventsWith(h3);
      
    • 3、生產者A生成的資料依次被A,B,C三個消費者消費

      disruptor.handleEventsWith(A).handleEventsWith(B).handleEventsWith(C);
      
  • Main.java

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.BusySpinWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;
    import com.test.sync13.generate1.Trade;
    
    public class Main {  
        public static void main(String[] args) throws InterruptedException {  
    
            long beginTime=System.currentTimeMillis();  
            int bufferSize=8;  
            ExecutorService executor=Executors.newFixedThreadPool(8);  
    
            Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
                @Override  
                public Trade newInstance() {  
                    return new Trade();  
                }  
            }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
    
            //菱形操作
            /**
            //使用disruptor建立消費者組C1,C2  
            EventHandlerGroup<Trade> handlerGroup = 
                    disruptor.handleEventsWith(new Handler1(), new Handler2());
            //宣告在C1,C2完事之後執行JMS訊息傳送操作 也就是流程走到C3 
            handlerGroup.then(new Handler3());
            */
    
            //順序操作
            /** */
            disruptor.handleEventsWith(new Handler1()).
                handleEventsWith(new Handler2()).
                handleEventsWith(new Handler3());
    
    
            //六邊形操作. 
            /** 
            Handler1 h1 = new Handler1();
            Handler2 h2 = new Handler2();
            Handler3 h3 = new Handler3();
            Handler4 h4 = new Handler4();
            Handler5 h5 = new Handler5();
            disruptor.handleEventsWith(h1, h2);
            disruptor.after(h1).handleEventsWith(h4);
            disruptor.after(h2).handleEventsWith(h5);
            disruptor.after(h4, h5).handleEventsWith(h3);
           */
    
    
    
            disruptor.start();//啟動  
            CountDownLatch latch=new CountDownLatch(1);  
            //生產者準備  
            executor.submit(new TradePublisher(latch, disruptor));
    
            latch.await();//等待生產者完事. 
    
            disruptor.shutdown();  
            executor.shutdown();  
            System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));  
        }  
    }  
    
  • Handler*.java

    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.WorkHandler;
    import com.test.sync13.generate1.Trade;
    
    public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {  
    
        @Override  
        public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
            this.onEvent(event);  
        }  
    
        @Override  
        public void onEvent(Trade event) throws Exception {  
            System.out.println("handler1: set name");
            event.setName("h1");
            Thread.sleep(500);
        }  
    }  
    
  • TradePublisher.java

    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    
    import com.lmax.disruptor.EventTranslator;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.test.sync13.generate1.Trade;
    
    public class TradePublisher implements Runnable {  
    
        Disruptor<Trade> disruptor;  
        private CountDownLatch latch;  
    
        private static int LOOP=10;//模擬百萬次交易的發生  
    
        public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
            this.disruptor=disruptor;  
            this.latch=latch;  
        }  
    
        @Override  
        public void run() {  
            TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
            for(int i=0;i<LOOP;i++){  
                disruptor.publishEvent(tradeTransloator);  
            }  
            //採用CountDownLatch來保證10個執行緒能夠同時啟動
            latch.countDown();  
        }  
    
    }  
    
    class TradeEventTranslator implements EventTranslator<Trade>{  
    
        private Random random=new Random();  
    
        @Override  
        public void translateTo(Trade event, long sequence) {  
            this.generateTrade(event);  
        }  
    
        private Trade generateTrade(Trade trade){  
            trade.setPrice(random.nextDouble()*9999);  
            return trade;  
        }  
    
    }  
    
  • Trade.java

    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Trade {  
    
        private String id;//ID  
        private String name;
        private double price;//金額  
        private AtomicInteger count = new AtomicInteger(0);
    
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public double getPrice() {
            return price;
        }
        public void setPrice(double price) {
            this.price = price;
        }
        public AtomicInteger getCount() {
            return count;
        }
        public void setCount(AtomicInteger count) {
            this.count = count;
        } 
    
    
    }  
    

相關推薦

網際網路架構6併發程式設計--Disruptor併發框架

6 Disruptor併發框架簡介 Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個執行

網際網路架構3併發程式設計--執行緒池

3 Executor框架 為了更好的控制多執行緒,JDK提供了一套執行緒框架Executor,幫助開發人員有效的進行執行緒控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,它扮演著執行

網際網路架構8Socket網路通訊程式設計--Netty

三、Socket網路通訊程式設計–Netty Netty是一個提供非同步事件驅動的網路應用框架,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。 換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發網路應用程式,比如客戶端和服務端的協議。Ne

老司機帶你玩轉面試6分散式鎖、併發競爭、雙寫一致

![](https://cdn.geekdigging.com/Interview/mianshi_header_1.jpg) ## 前文回顧 建議前面文章沒看過的同學先看下前面的文章: [「老司機帶你玩轉面試(1):快取中介軟體 Redis 基礎知識以及資料持久化」](https://www.geek

python快速學習系列6面向物件程式設計OOP

一、面向物件程式設計: 1.比設計模式更重要的是設計原則: 1)面向物件設計的目標: ·可擴充套件:新特性很容易新增到現有系統中,基本不影響系統原有功能 ·可修改:當修改某一部分程式碼時,不會影響到其他不相關的部分 ·可替代:用具有相同介面的程式碼去替換系統中某一部分程式碼時,系統不受影

Python併發程式設計網路程式設計之粘包現象

目錄 一、什麼是粘包 須知:只有TCP有粘包現象,UDP永遠不會粘包 粘包不一定會發生 如果發生了:1.可能是在客戶端已經粘了       2.客戶端沒有粘,可能是在服務端粘了 首先需要掌握一個socket收發訊息的原理 應用

linux命令學習6ps命令

bytes 釋放 ice cti width kthread hellip 名稱 pts Linux中的ps命令是Process Status的縮寫。ps命令用來列出系統中當前運行的那些進程。ps命令列出的是當前那些進程的快照,就是執行ps命令的那個時刻的那些進程,如果想要

C++傳智筆記6socket客戶端發送報文接受報文的api接口

內存泄露 rcp 分配內存 strcpy light cpp tac 第三方 _file__ #define _CRT_SECURE_NO_WARNINGS #include "stdio.h" #include "stdlib.h" #include "string.

Windows Phone開發6處理屏幕方向的改變

cati sources mon stack mar ber XML break pac 俺們都知道,智能手機可以通過旋轉手機來改變屏幕的顯示方向,更多的時候,對於屏幕方向的改變,我們要做出相應的處理,例如,當手機屏幕方向從縱向變為橫向時,可能要重新排列頁面上的控件以適應顯

設計模式六大原則6開閉原則

思考 外部 編程人員 恰恰 單一職責 何事 適應 擴展 分享 開閉原則 定義:一個軟件實體如類、模塊和函數應該對擴展開放,對修改關閉。 問題由來:在軟件的生命周期內,因為變化、升級和維護等原因需要對軟件原有代碼進行修改時,可能會給舊代碼中引入錯誤,也可能會使我們不得不對

springBoot6web開發-模板引擎jsp

spring boot 一、新建工程 註意新建的工程下沒有webapp目錄eclipse下會自動創建webapp目錄這裏我們需要自動創建一個webapp目錄並創建WEB-INF。 對ServletInitializer.java進行說明 1、這個類相當於我們以前的web.xml 2、只有3.0以上才

學習用Node.js和Elasticsearch構建搜索引擎6實際項目中常用命令使用記錄

nds 黃色 ati cat htm action last shard open 1、檢測集群是否健康。 curl -XGET ‘localhost:9200/_cat/health?v‘#後面加一個v表示讓輸出內容表格顯示表頭 綠色表示一切正常,黃色表示所有

高性能服務器架構緩沖策略

lin 特點 領域 思路 不能 查表 edi 操作 帶寬 原文鏈接:https://mp.weixin.qq.com/s?__biz=MzA5ODExMTkwMA==&mid=402675187&idx=1&sn=d240f6d1430b86bc00

EasyPR源碼剖析6車牌判斷之LBP特征

extend 順序 位置 feature tput ray bpf range str 一、LBP特征 LBP指局部二值模式,英文全稱:Local Binary Pattern,是一種用來描述圖像局部特征的算子,LBP特征具有灰度不變性和旋轉不變性等顯著優點。 原始的LBP

python函數6內置函數和匿名函數

a20 *args -s 執行 code str 思維導圖 inpu 其他 我們學了這麽多關於函數的知識基本都是自己定義自己使用,那麽我們之前用的一些函數並不是我們自己定義的比如說print(),len(),type()等等,它們是哪來的呢? 一、內置函數 由pytho

Linux自學筆記6Linux文件系統及文件類型

linux文件類型 linux文件系統 Linux的文件系統:跟文件系統(rootfs)root filesystem LSB,FHS:linux發行標準1 bin boot dev etc home lib lib64 media mnt opt proc root run

《Linux學習並不難》Linux常用操作命令6uname命令顯示計算機和系統相關信息

Linux8.6 《Linux學習並不難》Linux常用操作命令(6):uname命令顯示計算機和系統相關信息 使用uname命令可以顯示計算機以及操作系統的相關信息,比如計算機硬件架構、內核發行號、操作系統名稱、計算機主機名等。 命令語法: uname [選項] 命令中各選項的

《Linux學習並不難》用戶管理6刪除Linux用戶賬戶

Linux 用戶 userdel 9.6 《Linux學習並不難》用戶管理(6):刪除Linux用戶賬戶 使用userdel命令可以在Linux系統中刪除用戶賬戶,甚至連用戶的主目錄也一起刪除。命令語法:userdel [選項] [用戶名]命令中各選項的含義如表所示。選項 選項含

《Linux學習並不難》文件/目錄管理6mkdir命令創建目錄

Linux mkdir 目錄 7.6 《Linux學習並不難》文件/目錄管理(6):mkdir命令創建目錄使用mkdir命令可以在Linux系統中創建目錄。 命令語法:mkdir [選項] [目錄]命令中各選項的含義如表所示。選項 選項含義 -m <權限模式>對新創建的目錄設置權限

Exchange 2016與國內版O365混合部署6混合後的操作和驗證

分享圖片 In 報告 mailbox inf 通訊錄 png 完成後 動手 雲端和本地統一的通訊錄: AAD Connect 目錄同步後,登錄O365管理員界面查看用戶,可以看到本地的兩個用戶已經同步上來了,同步類型顯示已與AD同步。 登錄一個雲端用戶的郵箱,在收件人處輸