Disruptor併發框架-1
阿新 • • 發佈:2018-12-27
//http://ifeve.com/disruptor-getting-started/
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
import com.lmax.disruptor.EventFactory; // 需要讓disruptor為我們建立事件,我們同時還聲明瞭一個EventFactory來例項化Event物件。 public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); } }
import com.lmax.disruptor.EventHandler; //我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中儲存的資料列印到終端: public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } }
import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class LongEventMain { public static void main(String[] args) throws Exception { //建立緩衝池 ExecutorService executor = Executors.newCachedThreadPool(); //建立工廠 LongEventFactory factory = new LongEventFactory(); //建立bufferSize ,也就是RingBuffer大小,必須是2的N次方 int ringBufferSize = 1024 * 1024; // /** //BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現 WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy(); //SleepingWaitStrategy 的效能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景 WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); //YieldingWaitStrategy 的效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性 WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); */ //建立disruptor // 1.第一個引數為工廠類物件,用於建立一個個的LongEvent,LongEvent是實際的消費資料 // 2.第二個引數為緩衝區大小 // 3.第三個引數執行緒池 進行 Disruptor 內部的資料接收處理排程 // 4.第四個引數ProducerType.SINGLE 和 ProducerType.MULTI // 5.第五個引數是一種策略:WaitStrategy Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 連線消費事件方法 disruptor.handleEventsWith(new LongEventHandler()); // 啟動 disruptor.start(); //Disruptor 的事件釋出過程是一個兩階段提交的過程: //釋出事件 // 使用該方法獲取具體存放資料的容器ringBuffer(環形結構) RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for(long l = 0; l<100; l++){ byteBuffer.putLong(0, l); producer.onData(byteBuffer); //Thread.sleep(1000); } disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理; executor.shutdown();//關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉; } }
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
/**
* 很明顯的是:當用一個簡單佇列來發布事件的時候會牽涉更多的細節,這是因為事件物件還需要預先建立。
* 釋出事件最少需要兩步:獲取下一個事件槽併發布事件(釋出事件的時候要使用try/finnally保證事件一定會被髮布)。
* 如果我們使用RingBuffer.next()獲取一個事件槽,那麼一定要釋出對應的事件。
* 如果不能釋出事件,那麼就會引起Disruptor狀態的混亂。
* 尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。
* @since 2015年11月23日
*/
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
/**
* onData用來發布事件,每呼叫一次就釋出一次事件
* 它的引數會用過事件傳遞給消費者
*/
public void onData(ByteBuffer bb){
//1.可以把ringBuffer看做一個事件佇列,那麼next就是得到下面一個事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件物件)
LongEvent event = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業務資料
event.setValue(bb.getLong(0));
} finally {
//4.釋出事件
//注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
/**
* Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,
* 所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來發布事件
* @since 2015年11月23日
*/
public class LongEventProducerWithTranslator {
//一個translator可以看做一個事件初始化器,publicEvent方法會呼叫它
//填充Event
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
event.setValue(buffer.getLong(0));
}
};
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer buffer){
ringBuffer.publishEvent(TRANSLATOR, buffer);
}
}