disruptor 框架使用以及ringbuffer原理解析
Disruptor
概述
子主題 1
- 生產者消費組框架
子主題 2
使用
子主題 1
- 1.建Event類(資料物件)
- 2.建立一個生產資料的工廠類,EventFactory,用於生產資料;
- 3.監聽事件類(處理Event資料)
- 4.例項化Disruptor,配置引數,繫結事件;
- 5.建存放資料的核心 RingBuffer,生產的資料放入 RungBuffer。
- ringbuffer
- 它是一個環(首尾相接的環),你可以把它用做在不同上下文(執行緒)間傳遞資料的buffer。
- ring buffer和大家常用的佇列之間的區別是,我們不刪除buffer中的資料,也就是說這些資料一直存放在buffer中,直到新的資料覆蓋他們。
- why fast
- 是因為它在可靠訊息傳遞方面有很好的效能
- 首先,因為它是陣列,所以要比連結串列快,而且有一個容易預測的訪問模式。(譯者注:陣列內元素的記憶體地址的連續性儲存的)。這是對CPU快取友好的—也就是說,在硬體級別,陣列中的元素是會被預載入的,因此在ringbuffer當中,cpu無需時不時去主存載入陣列中的下一個元素。(校對注:因為只要一個元素被載入到快取行,其他相鄰的幾個元素也會被載入進同一個快取行)
- 其次,你可以為陣列預先分配記憶體,使得陣列物件一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。此外,不像連結串列那樣,需要為每一個新增到其上面的物件創造節點物件—對應的,當刪除節點時,需要執行相應的記憶體清理操作。
- inner
- 子主題 1
- ring buffer維護兩個指標,“next”和“cursor”。
- 填充資料
- 假設有一個執行緒負責將字母“D”寫進ring buffer中。將會從ring buffer中獲取一個區塊(slot),這個操作是一個基於CAS的“get-and-increment”操作,將“next”指標進行自增。這樣,當前執行緒(我們可以叫做執行緒D)進行了get-and-increment操作後,
- 填充資料
- ring buffer維護兩個指標,“next”和“cursor”。
- 子主題 1
- ringbuffer
指向了位置4,然後返回3。這樣,執行緒D就獲得了位置3的操作許可權。 * 接著,另一個執行緒E做類似以上的操作 * 提交寫入 * 以上,執行緒D和執行緒E都可以同時執行緒安全的往各自負責的區塊(或位置,slots)寫入資料。但是,我們可以討論一下執行緒E先完成任務的場景…
執行緒E嘗試提交寫入資料。在一個繁忙的迴圈中有若干的CAS提交操作。執行緒E持有位置4,它將會做一個CAS的waiting操作,直到 “cursor”變成3,然後將“cursor”變成4。
再次強調,這是一個原子性的操作。因此,現在的ring buffer中,“cursor”現在是2,執行緒E將會進入長期等待並重試操作,直到 “cursor”變成3。
然後,執行緒D開始提交。執行緒E用CAS操作將“cursor”設定為3(執行緒E持有的區塊位置)當且僅當“cursor”位置是2.“cursor”當前是2,所以CAS操作成功和提交也成功了。
這時候,“cursor”已經更新成3,然後所有和3相關的資料變成可讀。
這是一個關鍵點。知道ring buffer填充了多少 – 即寫了多少資料,那一個序列數寫入最高等等,是遊標的一些簡單的功能。“next”指標是為了保證寫入的事務特性 * 子主題 1 * 最後的疑惑是執行緒E的寫入可見,執行緒E一直重試,嘗試將“cursor”從3更新成4,經過執行緒D操作後已經更新成3,那麼下一次重試就可以成功了。 * https://blog.csdn.net/chen_fly2011/article/details/54929468 * 原理 * 1.Ring buffer是由一個大陣列組成的。 * 2.所有Ring buffer的“指標”(也稱為序列或遊標)是Java long型別的(64位有符號數),指標採用往上計數自增的方式。(不用擔心越界,即使每秒1,000,000條訊息,也要消耗300,000年才可以用完)。 * 3.對Ring buffer中的指標進行按Ring buffer的size取模找出陣列的下標來定位入口(類似於HashMap的entry)。為了提高效能,我們通常將ring buffer的size大小設定成實際使用的2倍,這樣我們可以通過位運算(bit-mask )的方式計算出陣列的下標。
XMind: ZEN - Trial Version 程式碼實現: LongEventMain 主入口:
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class LongEventMain {
public static void main(String[] args) throws Exception {
//建立緩衝池
ExecutorService executor = Executors.newCachedThreadPool();
//建立工廠
LongEventFactory factory = new LongEventFactory();
//建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
int ringBufferSize = 1024 * 1024; //
/**
//BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
//SleepingWaitStrategy 的效能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
//YieldingWaitStrategy 的效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
*/
//建立disruptor
Disruptor<LongEvent> disruptor =
new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 連線消費事件方法
disruptor.handleEventsWith(new LongEventHandler());
// 啟動
disruptor.start();
//Disruptor 的事件釋出過程是一個兩階段提交的過程:
//釋出事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
org.apache.log4j.Logger logger1 = LogManager.getLogger(LongEventMain.class);
LongEventProducer producer = new LongEventProducer(ringBuffer);
//LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for(long l = 0; l<1000; l++){
byteBuffer.putLong(0, l);
producer.onData(byteBuffer);
//Thread.sleep(1000);
}
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
executor.shutdown();//關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;
}
}
-準備工廠類
import com.lmax.disruptor.EventFactory;
//需要讓disruptor為我們建立事件,我們同時還聲明瞭一個EventFactory來例項化Event物件。
public class LongEventFactory implements EventFactory {
public Object newInstance() {
return new LongEvent();
}
}
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
生產者:
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
/**
* onData用來發布事件,每呼叫一次就釋出一次事件
* 它的引數會用過事件傳遞給消費者
*/
public void onData(ByteBuffer bb){
//1.可以把ringBuffer看做一個事件佇列,那麼next就是得到下面一個事件槽
long sequence = ringBuffer.next();
try {
//2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件物件)
LongEvent event = ringBuffer.get(sequence);
//3.獲取要通過事件傳遞的業務資料
event.setValue(bb.getLong(0));
} finally {
//4.釋出事件
//注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}
消費者
//我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中儲存的資料列印到終端:
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println(longEvent.getValue()+" haha");
}
}