1. 程式人生 > >disruptor 框架使用以及ringbuffer原理解析

disruptor 框架使用以及ringbuffer原理解析

在這裡插入圖片描述

Disruptor

概述

子主題 1

  • 生產者消費組框架

子主題 2

使用

子主題 1

  • 1.建Event類(資料物件)
  • 2.建立一個生產資料的工廠類,EventFactory,用於生產資料;
  • 3.監聽事件類(處理Event資料)
  • 4.例項化Disruptor,配置引數,繫結事件;
  • 5.建存放資料的核心 RingBuffer,生產的資料放入 RungBuffer。
    • ringbuffer
      • 它是一個環(首尾相接的環),你可以把它用做在不同上下文(執行緒)間傳遞資料的buffer。
      • ring buffer和大家常用的佇列之間的區別是,我們不刪除buffer中的資料,也就是說這些資料一直存放在buffer中,直到新的資料覆蓋他們。
      • why fast
        • 是因為它在可靠訊息傳遞方面有很好的效能
        • 首先,因為它是陣列,所以要比連結串列快,而且有一個容易預測的訪問模式。(譯者注:陣列內元素的記憶體地址的連續性儲存的)。這是對CPU快取友好的—也就是說,在硬體級別,陣列中的元素是會被預載入的,因此在ringbuffer當中,cpu無需時不時去主存載入陣列中的下一個元素。(校對注:因為只要一個元素被載入到快取行,其他相鄰的幾個元素也會被載入進同一個快取行)
        • 其次,你可以為陣列預先分配記憶體,使得陣列物件一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。此外,不像連結串列那樣,需要為每一個新增到其上面的物件創造節點物件—對應的,當刪除節點時,需要執行相應的記憶體清理操作。
      • inner
        • 子主題 1
          • ring buffer維護兩個指標,“next”和“cursor”。
            • 填充資料
              • 假設有一個執行緒負責將字母“D”寫進ring buffer中。將會從ring buffer中獲取一個區塊(slot),這個操作是一個基於CAS的“get-and-increment”操作,將“next”指標進行自增。這樣,當前執行緒(我們可以叫做執行緒D)進行了get-and-increment操作後,

指向了位置4,然後返回3。這樣,執行緒D就獲得了位置3的操作許可權。 * 接著,另一個執行緒E做類似以上的操作 * 提交寫入 * 以上,執行緒D和執行緒E都可以同時執行緒安全的往各自負責的區塊(或位置,slots)寫入資料。但是,我們可以討論一下執行緒E先完成任務的場景…

執行緒E嘗試提交寫入資料。在一個繁忙的迴圈中有若干的CAS提交操作。執行緒E持有位置4,它將會做一個CAS的waiting操作,直到 “cursor”變成3,然後將“cursor”變成4。

再次強調,這是一個原子性的操作。因此,現在的ring buffer中,“cursor”現在是2,執行緒E將會進入長期等待並重試操作,直到 “cursor”變成3。

然後,執行緒D開始提交。執行緒E用CAS操作將“cursor”設定為3(執行緒E持有的區塊位置)當且僅當“cursor”位置是2.“cursor”當前是2,所以CAS操作成功和提交也成功了。

這時候,“cursor”已經更新成3,然後所有和3相關的資料變成可讀。

這是一個關鍵點。知道ring buffer填充了多少 – 即寫了多少資料,那一個序列數寫入最高等等,是遊標的一些簡單的功能。“next”指標是為了保證寫入的事務特性 * 子主題 1 * 最後的疑惑是執行緒E的寫入可見,執行緒E一直重試,嘗試將“cursor”從3更新成4,經過執行緒D操作後已經更新成3,那麼下一次重試就可以成功了。 * https://blog.csdn.net/chen_fly2011/article/details/54929468 * 原理 * 1.Ring buffer是由一個大陣列組成的。 * 2.所有Ring buffer的“指標”(也稱為序列或遊標)是Java long型別的(64位有符號數),指標採用往上計數自增的方式。(不用擔心越界,即使每秒1,000,000條訊息,也要消耗300,000年才可以用完)。 * 3.對Ring buffer中的指標進行按Ring buffer的size取模找出陣列的下標來定位入口(類似於HashMap的entry)。為了提高效能,我們通常將ring buffer的size大小設定成實際使用的2倍,這樣我們可以通過位運算(bit-mask )的方式計算出陣列的下標。

XMind: ZEN - Trial Version 程式碼實現: LongEventMain 主入口:

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

    public static void main(String[] args) throws Exception {
        //建立緩衝池
        ExecutorService  executor = Executors.newCachedThreadPool();
        //建立工廠
        LongEventFactory factory = new LongEventFactory();
        //建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
        int ringBufferSize = 1024 * 1024; // 

        /**
        //BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現
        WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
        //SleepingWaitStrategy 的效能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景
        WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
        //YieldingWaitStrategy 的效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性
        WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        */
        
        //建立disruptor
        Disruptor<LongEvent> disruptor = 
                new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        // 連線消費事件方法
        disruptor.handleEventsWith(new LongEventHandler());
        
        // 啟動
        disruptor.start();
        
        //Disruptor 的事件釋出過程是一個兩階段提交的過程:
        //釋出事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        org.apache.log4j.Logger logger1 = LogManager.getLogger(LongEventMain.class);
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
        //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for(long l = 0; l<1000; l++){
            byteBuffer.putLong(0, l);
            producer.onData(byteBuffer);
            //Thread.sleep(1000);
        }
        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
        executor.shutdown();//關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;        
        
        
    }
}

-準備工廠類

import com.lmax.disruptor.EventFactory;
//需要讓disruptor為我們建立事件,我們同時還聲明瞭一個EventFactory來例項化Event物件。
public class LongEventFactory implements EventFactory { 

 public Object newInstance() { 
     return new LongEvent(); 
 } 
} 
public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

生產者:

public class LongEventProducer {

    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    
    /**
     * onData用來發布事件,每呼叫一次就釋出一次事件
     * 它的引數會用過事件傳遞給消費者
     */
    public void onData(ByteBuffer bb){
        //1.可以把ringBuffer看做一個事件佇列,那麼next就是得到下面一個事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件物件)
            LongEvent event = ringBuffer.get(sequence);
            //3.獲取要通過事件傳遞的業務資料
            event.setValue(bb.getLong(0));
        } finally {
            //4.釋出事件
            //注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }
    
    
}

消費者

//我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中儲存的資料列印到終端:
public class LongEventHandler implements EventHandler<LongEvent>  {

  public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
      System.out.println(longEvent.getValue()+" haha");         
  }

}