1. 程式人生 > >Disruptor併發框架-1

Disruptor併發框架-1


//http://ifeve.com/disruptor-getting-started/
public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

import com.lmax.disruptor.EventFactory;
// 需要讓disruptor為我們建立事件,我們同時還聲明瞭一個EventFactory來例項化Event物件。
public class LongEventFactory implements EventFactory { 

    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 
import com.lmax.disruptor.EventHandler;

//我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中儲存的資料列印到終端:
public class LongEventHandler implements EventHandler<LongEvent>  {

	@Override
	public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
		System.out.println(longEvent.getValue()); 		
	}

}

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

	public static void main(String[] args) throws Exception {
		//建立緩衝池
		ExecutorService  executor = Executors.newCachedThreadPool();
		//建立工廠
		LongEventFactory factory = new LongEventFactory();
		//建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
		int ringBufferSize = 1024 * 1024; // 

		/**
		//BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現
		WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
		//SleepingWaitStrategy 的效能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景
		WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
		//YieldingWaitStrategy 的效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性
		WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
		*/
		
		//建立disruptor
		// 1.第一個引數為工廠類物件,用於建立一個個的LongEvent,LongEvent是實際的消費資料
		// 2.第二個引數為緩衝區大小
		// 3.第三個引數執行緒池 進行 Disruptor 內部的資料接收處理排程
		// 4.第四個引數ProducerType.SINGLE 和  ProducerType.MULTI
		// 5.第五個引數是一種策略:WaitStrategy
		Disruptor<LongEvent> disruptor = 
				new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
		// 連線消費事件方法
		disruptor.handleEventsWith(new LongEventHandler());
		
		// 啟動
		disruptor.start();
		
		//Disruptor 的事件釋出過程是一個兩階段提交的過程:
		//釋出事件
		// 使用該方法獲取具體存放資料的容器ringBuffer(環形結構)
		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
		
//		LongEventProducer producer = new LongEventProducer(ringBuffer); 
		LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
		ByteBuffer byteBuffer = ByteBuffer.allocate(8);
		for(long l = 0; l<100; l++){
			byteBuffer.putLong(0, l);
			producer.onData(byteBuffer);
			//Thread.sleep(1000);
		}

		
		disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
		executor.shutdown();//關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;		
		
		
		
		
		
		
		
	}
}

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;
/**
 * 很明顯的是:當用一個簡單佇列來發布事件的時候會牽涉更多的細節,這是因為事件物件還需要預先建立。
 * 釋出事件最少需要兩步:獲取下一個事件槽併發布事件(釋出事件的時候要使用try/finnally保證事件一定會被髮布)。
 * 如果我們使用RingBuffer.next()獲取一個事件槽,那麼一定要釋出對應的事件。
 * 如果不能釋出事件,那麼就會引起Disruptor狀態的混亂。
 * 尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。
 * @since 2015年11月23日
 */
public class LongEventProducer {

	private final RingBuffer<LongEvent> ringBuffer;
	
	public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
		this.ringBuffer = ringBuffer;
	}
	
	/**
	 * onData用來發布事件,每呼叫一次就釋出一次事件
	 * 它的引數會用過事件傳遞給消費者
	 */
	public void onData(ByteBuffer bb){
		//1.可以把ringBuffer看做一個事件佇列,那麼next就是得到下面一個事件槽
		long sequence = ringBuffer.next();
		try {
			//2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件物件)
			LongEvent event = ringBuffer.get(sequence);
			//3.獲取要通過事件傳遞的業務資料
			event.setValue(bb.getLong(0));
		} finally {
			//4.釋出事件
			//注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。
			ringBuffer.publish(sequence);
		}
	}
	
	
	
	
	
}

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,
 * 所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來發布事件
 * @since 2015年11月23日
 */
public class LongEventProducerWithTranslator {

	//一個translator可以看做一個事件初始化器,publicEvent方法會呼叫它
	//填充Event
	private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = 
			new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
				@Override
				public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
					event.setValue(buffer.getLong(0));
				}
			};
	
	private final RingBuffer<LongEvent> ringBuffer;
	
	public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}
	
	public void onData(ByteBuffer buffer){
		ringBuffer.publishEvent(TRANSLATOR, buffer);
	}
	
	
	
}