1. 程式人生 > 實用技巧 >Unity熱更新04-XLua呼叫C#-07-Lua呼叫C# 委託

Unity熱更新04-XLua呼叫C#-07-Lua呼叫C# 委託

Disruptor是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題。與Kafka、RabbitMQ用於服務間的訊息佇列不同,disruptor一般用於執行緒間訊息的傳遞。基於Disruptor開發的系統單執行緒能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注。2011年,企業應用軟體專家Martin Fowler專門撰寫長文介紹The LMAX Architecture。同年它還獲得了Oracle官方的Duke大獎。其他關於disruptor的背景就不在此多言,可以自己google。

Disruptor效能
 disruptor是用於一個JVM中多個執行緒之間的訊息佇列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、效能都遠好於ArrayBlockingQueue,當多個執行緒之間傳遞大量資料或對效能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
 官方也對disruptor和ArrayBlockingQueue的效能在不同的應用場景下做了對比,本文列出其中一組資料,資料中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue,目測效能只有有5~10倍左右的提升。

image
 完整的官方效能測試資料在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,效能測試的程式碼已經包含在disruptor的程式碼中,有興趣的可以直接過去看看。

Disruptor原理介紹
Disruptor組成元素圖
Sequence
 Sequence是Disruptor最核心的元件,上面已經提到過了。生產者對RingBuffer的互斥訪問,生產者與消費者之間的協調以及消費者之間的協調,都是通過Sequence實現。幾乎每一個重要的元件都包含Sequence。那麼Sequence是什麼呢?首先Sequence是一個遞增的序號,說白了就是計數器;其次,由於需要線上程間共享,所以Sequence是引用傳遞,並且是執行緒安全的;再次,Sequence支援CAS操作;最後,為了提高效率,Sequence通過padding來避免偽共享。

RingBuffer
 RingBuffer是儲存訊息的地方,通過一個名為cursor的Sequence物件指示佇列的頭,協調多個生產者向RingBuffer中新增訊息,並用於在消費者端判斷RingBuffer是否為空。巧妙的是,表示佇列尾的Sequence並沒有在RingBuffer中,而是由消費者維護。這樣的好處是多個消費者處理訊息的方式更加靈活,可以在一個RingBuffer上實現訊息的單播,多播,流水線以及它們的組合。其缺點是在生產者端判斷RingBuffer是否已滿是需要跟蹤更多的資訊,為此,在RingBuffer中維護了一個名為gatingSequences的Sequence陣列來跟蹤相關Seqence。

SequenceBarrier
 SequenceBarrier用來在消費者之間以及消費者和RingBuffer之間建立依賴關係。在Disruptor中,依賴關係實際上指的是Sequence的大小關係,消費者A依賴於消費者B指的是消費者A的Sequence一定要小於等於消費者B的Sequence,這種大小關係決定了處理某個訊息的先後順序。因為所有消費者都依賴於RingBuffer,所以消費者的Sequence一定小於等於RingBuffer中名為cursor的Sequence,即訊息一定是先被生產者放到Ringbuffer中,然後才能被消費者處理。

SequenceBarrier在初始化的時候會收集需要依賴的元件的Sequence,RingBuffer的cursor會被自動的加入其中。需要依賴其他消費者和/或RingBuffer的消費者在消費下一個訊息時,會先等待在SequenceBarrier上,直到所有被依賴的消費者和RingBuffer的Sequence大於等於這個消費者的Sequence。當被依賴的消費者或RingBuffer的Sequence有變化時,會通知SequenceBarrier喚醒等待在它上面的消費者。

WaitStrategy
 當消費者等待在SequenceBarrier上時,有許多可選的等待策略,不同的等待策略在延遲和CPU資源的佔用上有所不同,可以視應用場景選擇:

BusySpinWaitStrategy : 自旋等待,類似Linux Kernel使用的自旋鎖。低延遲但同時對CPU資源的佔用也多。
BlockingWaitStrategy : 使用鎖和條件變數。CPU資源的佔用少,延遲大。
SleepingWaitStrategy : 在多次迴圈嘗試不成功後,選擇讓出CPU,等待下次排程,多次排程後仍不成功,嘗試前睡眠一個納秒級別的時間再嘗試。這種策略平衡了延遲和CPU資源佔用,但延遲不均勻。
YieldingWaitStrategy : 在多次迴圈嘗試不成功後,選擇讓出CPU,等待下次調。平衡了延遲和CPU資源佔用,但延遲也比較均勻。
PhasedBackoffWaitStrategy : 上面多種策略的綜合,CPU資源的佔用少,延遲大。

為什麼RingBuffer這麼快,首先丟擲兩個原因:
首先是CPU false sharing的解決,Disruptor通過將基本物件填充冗餘基本型別變數來填充滿整個快取行,減少false sharing的概率,這部分沒怎麼看懂,Disruptor通過填充失效這個效果。

無鎖佇列的實現,對於傳統併發佇列,至少要維護兩個指標,一個頭指標和一個尾指標。在併發訪問修改時,頭指標和尾指標的維護不可避免的應用了鎖。Disruptor由於是環狀佇列,對於Producer而言只有頭指標而且鎖是樂觀鎖,在標準Disruptor應用中,只有一個生產者,避免了頭指標鎖的爭用。所以我們可以理解Disruptor為無鎖佇列。

RingBuffer整個工作流程
工作流程圖
每個RingBuffer是一個環狀佇列,佇列中每個元素可以理解為一個槽。在初始化時,RingBuffer規定了總大小,就是這個環最多可以容納多少槽。這裡Disruptor規定了,RingBuffer大小必須是2的n次方。這裡用了一個小技巧,就是將取模轉變為取與運算。在記憶體管理中,我們常用的就是取餘定位操作。如果我們想在Ringbuffer定位,一般會用到某個數字對Ringbuffer的大小取餘。如果是對2的n次方取餘,則可以簡化成m % 2^n = m & ( 2^n - 1 )

Producer會向這個RingBuffer中填充元素,填充元素的流程是首先從RingBuffer讀取下一個Sequence,之後在這個Sequence位置的槽填充資料,之後釋出。

Consumer消費RingBuffer中的資料,通過SequenceBarrier來協調不同的Consumer的消費先後順序,以及獲取下一個消費位置Sequence。

Producer在RingBuffer寫滿時,會從頭開始繼續寫替換掉以前的資料。但是如果有SequenceBarrier指向下一個位置,則不會覆蓋這個位置,阻塞到這個位置被消費完成。Consumer同理,在所有Barrier被消費完之後,會阻塞到有新的資料進來。

Disruptor例子
如何使用 Disruptor ,Disruptor 的 API 十分簡單,主要有以下幾個步驟:

1、定義事件:事件(Event)就是通過 Disruptor 進行交換的資料型別。

2、定義事件工廠:事件工廠(Event Factory)定義瞭如何例項化前面第1步中定義的事件(Event),需要實現介面 com.lmax.disruptor.EventFactory。Disruptor 通過 EventFactory 在 RingBuffer 中預建立 Event 的例項。一個 Event 例項實際上被用作一個“資料槽”,釋出者釋出前,先從 RingBuffer 獲得一個 Event 的例項,然後往 Event 例項中填充資料,之後再發布到 RingBuffer 中,之後由 Consumer 獲得該 Event 例項並從中讀取資料。

3、定義事件處理的具體實現:通過實現介面 com.lmax.disruptor.EventHandler 定義事件處理的具體實現。

4、定義用於事件處理的執行緒池:Disruptor 通過 java.util.concurrent.ExecutorService 提供的執行緒來觸發 Consumer 的事件處理。

5、指定等待策略:Disruptor 定義了 com.lmax.disruptor.WaitStrategy 介面用於抽象 Consumer 如何等待新事件,這是策略模式的應用。
Disruptor 提供了多個 WaitStrategy 的實現,每種策略都具有不同效能和優缺點,根據實際執行環境的 CPU 的硬體特點選擇恰當的策略,並配合特定的 JVM 的配置引數,能夠實現不同的效能提升。例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現;SleepingWaitStrategy 的效能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景;YieldingWaitStrategy 的效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性。

6、啟動 Disruptor。

7、釋出事件:Disruptor 的事件釋出過程是一個兩階段提交的過程:
  第一步:先從 RingBuffer 獲取下一個可以寫入的事件的序號;
  第二步:獲取對應的事件物件,將資料寫入事件物件;
  第三部:將事件提交到 RingBuffer;
事件只有在提交之後才會通知 EventProcessor 進行處理

8、關閉 Disruptor。

Disruptor使用例子原始碼
// step_1 定義事件
public class LongEvent
{
private long value;

public void set(long value)
{
    this.value = value;
}

}

// step_2 定義事件工廠
import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory
{
public LongEvent newInstance()
{
return new LongEvent();
}
}

// step_3 定義事件處理的具體實現
import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}

// step_4 定義用於事件處理的執行緒池
ExecutorService executor = Executors.newCachedThreadPool();

// 指定等待策略
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

// step_5 啟動 Disruptor
EventFactory eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方;

// step_6 釋出事件
Disruptor disruptor = new Disruptor(eventFactory,
ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
EventHandler eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);
disruptor.start();

// step_7 釋出事件;
RingBuffer ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//請求下一個事件序號;

try {
LongEvent event = ringBuffer.get(sequence);//獲取該序號對應的事件物件;
long data = getEventData();//獲取要通過事件傳遞的業務資料;
event.set(data);
} finally{
ringBuffer.publish(sequence);//釋出事件;
}

// step_8 Disruptor 還提供另外一種形式的呼叫來簡化以上操作,並確保 publish 總是得到呼叫。
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
@Override
public void translateTo(LongEvent event, long sequence, Long data) {
event.set(data);
}
}

public static Translator TRANSLATOR = new Translator();

public static void publishEvent2(Disruptor disruptor) {
// 釋出事件;
RingBuffer ringBuffer = disruptor.getRingBuffer();
long data = getEventData();//獲取要通過事件傳遞的業務資料;
ringBuffer.publishEvent(TRANSLATOR, data);
}

// step_9 關閉 Disruptor
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
executor.shutdown();//關閉 disruptor 使用的執行緒池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;

Disruptor核心元件原始碼解析
Disruptor核心元件 - Disruptor元件
RingBuffer ringBuffer用於儲存資料物件
Executor executor用於儲存消費端執行執行緒service
ConsumerRepository儲存消費分組資訊
public class Disruptor
{
//Disruptor核心變數RingBuffer用於儲存變數
private final RingBuffer ringBuffer;
// Disruptor用於執行consumer的ExecutorService物件
private final Executor executor;
// Disruptor儲存的所有消費者資訊
private final ConsumerRepository consumerRepository = new ConsumerRepository();
// 標記是否已經開始
private final AtomicBoolean started = new AtomicBoolean(false);
// 標記異常處理的handler
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper();

@Deprecated
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
}

@Deprecated
public Disruptor(
    final EventFactory<T> eventFactory,
    final int ringBufferSize,
    final Executor executor,
    final ProducerType producerType,
    final WaitStrategy waitStrategy)
{
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
}

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

}

Disruptor核心元件 - RingBuffer元件
RingBuffer內部通過填充解決記憶體偽共享問題,沒看懂!!
Object[] entries用來儲存資料
int bufferSize環形陣列大小
Sequencer sequencer 分為多生產者和單生產者兩種
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}

abstract class RingBufferFields extends RingBufferPad
{
//用於填充的物件引用,為什麼填充不知道?
private static final int BUFFER_PAD;
//entry儲存位置相對與array起始位置的偏移量,用於UNSAFE記憶體操作時進行定址,注意這個偏移量加上了用於填充的BUFFER_PAD大小
private static final long REF_ARRAY_BASE;
//對應物件引用佔用記憶體大小,計算出來的相對位移數,比如物件引用大小是4byte,那麼REF_ELEMENT_SHIFT=2,因為2的2次方=4;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();

static
{
    final int scale = UNSAFE.arrayIndexScale(Object[].class);
    if (4 == scale)
    {
        REF_ELEMENT_SHIFT = 2;
    }
    else if (8 == scale)
    {
        REF_ELEMENT_SHIFT = 3;
    }
    else
    {
        throw new IllegalStateException("Unknown pointer size");
    }
    BUFFER_PAD = 128 / scale;

    REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}

private final long indexMask;
// 用於儲存Entry的物件陣列,也就是RingBuffer當中儲存資料的資料結構
private final Object[] entries;
// 環形資料庫大小
protected final int bufferSize;

// RingBuffer當中的sequencer,分為SingleProducerSequencer和MultiProducerSequencer兩類
protected final Sequencer sequencer;

RingBufferFields(
    EventFactory<E> eventFactory,
    Sequencer sequencer)
{
    this.sequencer = sequencer;
    this.bufferSize = sequencer.getBufferSize();

    if (bufferSize < 1)
    {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.indexMask = bufferSize - 1;
    // 2 * BUFFER_PAD代表頭尾都需要增加BUFFER_PAD的空間,所以訪問空間需要以陣列的起始位置+BUFFER_PAD,當然需要轉化為位元組
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    fill(eventFactory);
}

private void fill(EventFactory<E> eventFactory)
{
    for (int i = 0; i < bufferSize; i++)
    {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

@SuppressWarnings("unchecked")
protected final E elementAt(long sequence)
{
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

}

public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer, EventSink
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;

RingBuffer(
    EventFactory<E> eventFactory,
    Sequencer sequencer)
{
    super(eventFactory, sequencer);
}

}

Disruptor核心元件 - SingleProducerSequencer
單生產者的Sequenecer物件

SingleProducerSequencer
public abstract class AbstractSequencer implements Sequencer
{
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

// RingBuffer的大小
protected final int bufferSize;

// 等待策略
protected final WaitStrategy waitStrategy;

// 當前RingBuffer對應的油表位置
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

//各個消費者持有的取數sequence陣列
protected volatile Sequence[] gatingSequences = new Sequence[0];

public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
    if (bufferSize < 1)
    {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.bufferSize = bufferSize;
    this.waitStrategy = waitStrategy;
}

}

abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;

public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
}

}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}

protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;

}

// 適用於單生產者的場景,由於沒有實現任何柵欄,使用多執行緒的生產者進行操作並不安全。

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;

public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
}

}

Sequencer元件-MultiProducerSequencer
多生產者物件MultiProducerSequencer

MultiProducerSequencer
public abstract class AbstractSequencer implements Sequencer
{
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

// RingBuffer的大小
protected final int bufferSize;

// 等待策略
protected final WaitStrategy waitStrategy;

// 當前RingBuffer對應的油表位置
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

//各個消費者持有的取數sequence陣列
protected volatile Sequence[] gatingSequences = new Sequence[0];

public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
    if (bufferSize < 1)
    {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1)
    {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.bufferSize = bufferSize;
    this.waitStrategy = waitStrategy;
}

}

public final class MultiProducerSequencer extends AbstractSequencer
{
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);

private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;

public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}

Sequencer元件-Sequence
通過unsafe實現的執行緒安全物件
負責實現生產者和消費者消費進度的原子技術

public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;

static
{
    UNSAFE = Util.getUnsafe();
    try
    {
        VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
    }
    catch (final Exception e)
    {
        throw new RuntimeException(e);
    }
}

public Sequence()
{
    this(INITIAL_VALUE);
}

public Sequence(final long initialValue)
{
    UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}

public long get()
{
    return value;
}

public void set(final long value)
{
    UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}

public void setVolatile(final long value)
{
    UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}

public boolean compareAndSet(final long expectedValue, final long newValue)
{
    return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}

public long incrementAndGet()
{
    return addAndGet(1L);
}

public long addAndGet(final long increment)
{
    long currentValue;
    long newValue;

    do
    {
        currentValue = get();
        newValue = currentValue + increment;
    }
    while (!compareAndSet(currentValue, newValue));

    return newValue;
}

@Override
public String toString()
{
    return Long.toString(get());
}

}