1. 程式人生 > >併發框架Disruptor快速入門

併發框架Disruptor快速入門

轉自https://blog.csdn.net/qq_19558705/article/details/77116949

1. 什麼是Disruptor

Disruptor它是一個高效能的非同步處理的開源併發框架,能夠在無鎖的情況下實現網路的Queue併發操作。可以認為是最快的訊息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。

2. HelloWorld程式碼

在生產者-消費者設計模型的中,採用有界佇列BlockingQueue作為儲存任務的容器,生產者將任務分配給容器,再由容器分配給消費者。Disruptor框架和該模型很相似。區別在於它用一個環形的RingBuffer作為儲存任務的容器。消費者是主動從RingBuffer中獲取資料而不是等待分配。從某種角度來說,Disruptor框架是升級版的生產者-消費者模型。

這裡用一段程式碼學習 Disruptor框架,業務邏輯是把10000以內的資料全部打印出來
1) 首先是建立傳遞資料的Event(Event是從生產者到消費者過程中所處理的資料單元)類,該類是由使用者定義。

/**
 * 第一步:建立一個數據單元Event
 * Event:從生產者到消費者過程中所處理的資料單元
 *
 */
public class MyDataEvent {
 
	private Long value;
 
	public Long getValue() {
		return value;
	}
 
	public void setValue(Long value) {
		this.value = value;
	}
 
}

2)建立一個例項化Event的工廠類

import com.lmax.disruptor.EventFactory;
 
/**
 * 第二步建立工廠類例項化Event
 * EventFactory 工廠,用於例項化Event類
 */
public class MyDataEventFactory implements EventFactory<MyDataEvent>{
 
	@Override
	public MyDataEvent newInstance() {
		return new MyDataEvent();
	}
 
}

3)建立一個事件處理器,也就是消費者,這裡只做資料列印的事件。

import com.lmax.disruptor.EventHandler;
 
/**
 * 第三步:消費端
 * EventHandler:消費者,也可以理解為事件處理器
 */
public class MyDataEventHandler implements EventHandler<MyDataEvent>{
 
	@Override
	public void onEvent(MyDataEvent myDataEvent, long arg1, boolean arg2)
			throws Exception {
		// 處理事件 ....
		System.out.println("處理事件,列印資料: " + myDataEvent.getValue());
	}
 
}

4)生產者釋出事件

import com.lmax.disruptor.RingBuffer;
 
/**
 * 第四步:生產端
 * 生產者
 */
public class MyDataEventProducer {
	
	private final RingBuffer<MyDataEvent> ringBuffer; // 敲黑板! 很重要的知識點
 
	public MyDataEventProducer(RingBuffer<MyDataEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}
	
	/**
	 * 釋出事件,每呼叫一次就釋出一次事件
     * 它的引數會通過事件傳遞給消費者
	 * @param byteBuffer 用 byteBuffer傳參 是考慮到 Disruptor 是訊息框架,而ByteBuffer又是讀取時通道 (SocketChannel)最常用的緩衝區
	 */
	public void publishData(ByteBuffer byteBuffer){
		// RingBuffer 是一個圓環,.next() 方法是獲取下一個索引值
		long sequence = ringBuffer.next();
		try {
			// 通過索引值獲取其物件
			MyDataEvent myDataEvent = ringBuffer.get(sequence);
			// 給資料單元賦值
			myDataEvent.setValue(byteBuffer.getLong(0)); // byteBuffer 的一個方法,文章中有連結
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 釋出事件,其實就是釋出索引 ,釋出方法必須放在finally 中,避免出現阻塞情況。
			ringBuffer.publish(sequence);
		}
	}
 
}

注意:
釋出事件是兩個步驟,第一步:先要從RingBuffer獲取下一個事件槽(可以理解為索引),第二步再是傳送事件。需要注意的是:獲取的事件槽,就要釋出該事件槽對應的事件。不然會出現混亂的情況。所以釋出事件的程式碼要放在finally中。 java8的寫法,文章底部有連結。
5)執行的Main方法,列印10000以內的資料

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
 
public class MyDataEventMain {
 
	public static void main(String[] args) {
		// step1 : 建立緩衝池
		ExecutorService executor = Executors.newCachedThreadPool();
		// step2 : 建立工廠
		MyDataEventFactory factory = new MyDataEventFactory();
		// step3 : 建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
	 	int ringBufferSize = 1024 * 1024;
		
	 	// step4 : 建立disruptor
	 	Disruptor<MyDataEvent> disruptor = new Disruptor<MyDataEvent>(factory, ringBufferSize, executor);
	 	
	 	// step5 : 連線消費事件方法<消費者>
	 	disruptor.handleEventsWith(new MyDataEventHandler());
	 	
	 	// step6 : 啟動
	 	disruptor.start();
	 	
	 	RingBuffer<MyDataEvent> ringBuffer = disruptor.getRingBuffer(); // 獲取 ringBuffer
	 	
	 	// step7 : 生產者釋出事件
	 	MyDataEventProducer producer = new MyDataEventProducer(ringBuffer);
	 	
	 	ByteBuffer byteBuffer = ByteBuffer.allocate(128); // 建立一個容量為128位元組的ByteBuffer
	 	
	 	for (long data = 1; data <= 10000 ; data++) { // 不管是列印100,1000,10000,基本上都是一秒內輸出。
			byteBuffer.putLong(0, data); // 在下標為零的位置儲存值
			producer.publishData(byteBuffer); // 
		}
	 	
	 	disruptor.shutdown(); // 關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
		executor.shutdown(); // 關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;
	}
 	
}

3 元件說明
從生產者-消費者的整體:
RingBuffer:環形佇列,是Disruptor最為重要的元件,其作用是儲存和更新Disruptor中流通的資料。
Sequence:遞增序號(AtomicLong),Disruptor使用Sequence標識一個特殊元件處理的序號。每個重要的元件基本都有一個Sequence。
Producer:生產者,泛指通過Disruptor釋出事件的使用者程式碼(實際業務程式碼,而併發框架程式碼)生成Event資料。
Event:事件,從生產者到消費者過程中的資料單元。由使用者定義程式碼。
EventHandler:消費者,代表Disruptor框架中的一個消費者介面,由使用者實現程式碼,負責處理Event資料,進度通過Sequence控制。
(打個比方:餐飲店買奶茶
你去餐飲店買奶茶,先要去櫃檯找服務員點一杯紅豆抹茶,服務員會給你一個55號的排隊號,等到服務員大喊:‘55號,55號’,於是你就屁顛屁顛的去拿紅豆抹茶;
“你去買紅豆抹茶” 就是 Producer
“紅豆抹茶” 就是 Event
“櫃檯” 就是 RingBuffer
“55號” 就是 Sequence
“你去拿紅豆抹茶” 就是 EventHandler)

從Disruptor框架如何處理Event的細節:
Sequecer:Disruptor框架真正的核心,在生產者和消費者直接進行高效準確快速的資料傳輸。通過複雜的演算法去協調生存者和消費者之間的關係。
SequenceBarrier:Sequecer具體的實施者,字面理解是序號屏障,其目的是決定消費者 消費Evnet的邏輯。(生產者釋出事件快於消費,生產者等待。消費速度大於生產者釋出事件速度,消費者監聽)
EventProcessor:可以理解為具體的消費執行緒,最後把結果返回給EventHandler。
WaitStrategy:當消費者等待在SequenceBarrier上時,有許多可選的等待策略
BusySpinWaitStrategy : 自旋等待,類似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的佔用也多。
BlockingWaitStrategy : 使用鎖和條件變數。CPU資源的佔用少,延遲大。
SleepingWaitStrategy : 在多次迴圈嘗試不成功後,選擇讓出CPU,等待下次排程,多次排程後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。
YieldingWaitStrategy : 在多次迴圈嘗試不成功後,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源佔用,但延遲也比較均勻。
PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的佔用少,延遲大。
(打個比方:
櫃檯的服務員通知某位廚師:“55號要一杯紅豆抹茶”,然後廚師準備拿機器做奶茶,發現機器都在使用中,於是廚師就盯著機器看,當有空閒的機器就立馬佔用,做好後就端給客戶。如果等了很久都沒有空閒的機器,廚師會跟客服員說:“55號的紅豆抹茶,可能要多等一會”,然後工作人員就和客戶協調一下說明情況。
“服務員” 就是 Sequecer
“某位廚師” 就是 SequenceBarrier
“用機器做紅豆抹茶” 就是 EventProcessor
“發現沒有空閒機器,廚師監聽” 就是 WaitStrategy

打的比方可能不是很形象。如果不理解的,可以反覆的敲打程式碼,多問問為什麼這樣寫,這樣做有什麼好處。慢慢的就理解了。