Disruptor併發框架-2
阿新 • • 發佈:2018-12-27
import java.util.concurrent.atomic.AtomicInteger; public class Trade { private String id;//ID private String name; private double price;//金額 private AtomicInteger count = new AtomicInteger(0); public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } public AtomicInteger getCount() { return count; } public void setCount(AtomicInteger count) { this.count = count; } }
import java.util.UUID; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //這裡做具體的消費邏輯 event.setId(UUID.randomUUID().toString());//簡單生成下ID System.out.println(event.getId()); } }
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer建立一個單生產者的RingBuffer, * 第一個引數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生資料填充RingBuffer的區塊。 * 第二個引數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 * 第三個引數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //建立執行緒池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //建立SequenceBarrier SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //建立訊息處理器 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //這一步的目的就是把消費者的位置資訊引用注入到生產者 如果只有一個消費者的情況可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //把訊息處理器提交到執行緒池 executors.submit(transProcessor); //如果存在多個消費者 那重複執行上面3行程式碼 把TradeHandler換成其它消費者類 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//佔個坑 --ringBuffer一個可用區塊 ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 資料 ringBuffer.publish(seq);//釋出這個區塊的資料使handler(consumer)可見 } return null; } }); future.get();//等待生產者結束 Thread.sleep(1000);//等上1秒,等消費都處理完成 transProcessor.halt();//通知事件(或者說訊息)處理器 可以結束了(並不是馬上結束!!!) executors.shutdown();//終止執行緒 } }
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
public class Main2 {
public static void main(String[] args) throws InterruptedException {
int BUFFER_SIZE=1024;
int THREAD_NUMBERS=4;
EventFactory<Trade> eventFactory = new EventFactory<Trade>() {
public Trade newInstance() {
return new Trade();
}
};
RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);
WorkHandler<Trade> handler = new TradeHandler();
WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);
workerPool.start(executor);
//下面這個生產8個數據
for(int i=0;i<8;i++){
long seq=ringBuffer.next();
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.publish(seq);
}
Thread.sleep(1000);
workerPool.halt();
executor.shutdown();
}
}