Java併發程式設計高階技術、高效能併發框架Disruptor
1.簡介
Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個執行緒裡每秒處理6百萬訂單。業務邏輯處理器完全是執行在記憶體中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。Disruptor它是一個開源的併發框架,並獲得2011 Duke’s 程式框架創新獎,能夠在無鎖的情況下實現網路的Queue併發操作。Disruptor是一個高效能的非同步處理框架,或者可以認為是最快的訊息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。
開源:
2.使用
- 目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本效能更加的優秀,提供更多的API使用方式,直接下載jar包,addTopath就可以使用了。
- 也可以以maven的方式來引入
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor --> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
- 示例
下面以一個例子來使用disruptor框架:
先了解一下步驟:
1. 建立一個Event類,用於建立Event類例項物件
import java.text.SimpleDateFormat;
public class OrderEvent {
private long value; //訂單的價格
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
- 需要有一個監聽事件類,用於處理資料(Event類)
import java.text.SimpleDateFormat;
public class OrderEvent {
private long value; //訂單的價格
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
- 例項化Disruptor例項,配置引數,編寫Disruptor
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) {
// 引數準備工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 1024*1024;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 建立OrderEvent類
* 1 eventFactory: 訊息(event)工廠物件 實現 implements EventFactory<OrderEvent>介面
* OrderEventHandler implements EventHandler<OrderEvent> 用來接受資料
* 2 ringBufferSize: 容器的長度
* 3 executor: 執行緒池(建議使用自定義執行緒池) RejectedExecutionHandler
* 4 ProducerType: 單生產者 還是 多生產者
* 5 waitStrategy: 等待策略
*/
//1. 例項化disruptor物件
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize, //容器大小
executor, //執行緒池
ProducerType.SINGLE, //SINGLE\MULTI 單生產者模式和 多生產者模式
new BlockingWaitStrategy()); //等待策略 阻塞策略
//2. 新增消費者的監聽 (構建disruptor 與 消費者的一個關聯關係)
disruptor.handleEventsWith(new OrderEventHandler());
//3. 啟動disruptor
disruptor.start();
//4. 獲取實際儲存資料的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//訂單生產者
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long i = 0 ; i < 100; i ++){
bb.putLong(0, i);
producer.sendData(bb);
}
disruptor.shutdown();
executor.shutdown();
}
}
- 編寫生產者元件,向Disruptor容器中去投遞資料
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生產者傳送訊息的時候, 首先 需要從我們的ringBuffer裡面 獲取一個可用的序號
long sequence = ringBuffer.next(); //0
try {
//2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent物件是一個沒有被賦值的"空物件"
OrderEvent event = ringBuffer.get(sequence);
//3 進行實際的賦值處理
event.setValue(data.getLong(0));
} finally {
//4 提交發布操作
ringBuffer.publish(sequence);
}
}
}
3.Disruptor為什麼這麼快
- 環形佇列RingBuffer
一個環形佇列,意味著首尾相連,對列可以迴圈使用,使用陣列來儲存。環形佇列在JVM生命週期中通常是永生的,GC的壓力更小。
我們來解釋一下這個圖:當前有一個consumer,停留在位置12,這時producer假設在位置3,這時producer的下一步是如何處理的呢?producer會嘗試讀取4,發現下一個可以獲取,所以可以安全獲取,並且通知一個阻塞的consumer起來活動。如此一直到下一圈11都是安全的(這裡我們假設生產者比較快),當producer嘗試訪問12時發現不能繼續,於是自旋等待;當consumer消費時,會呼叫barrier的waitFor方法,waitFor看到前面最近的安全節點已經到了下一圈的11,於是consumer可以無鎖的去消費當前12到下一圈11所有資料,可以想象,這種方式比起synchronized要快上很多倍。 - 棄用鎖機制使用CAS
在高度競爭的情況下,鎖的效能將超過原子變數的效能,但是更真實的競爭情況下,原子變數的效能將超過鎖的效能。同時原子變數不會有死鎖等活躍性問題。能不用鎖,就不使用鎖,如果使用,也要將鎖的粒度最小化。
唯一使用鎖的就是消費者的等待策略實現類中,下圖。補充一句,生產者的等到策略就是LockSupport.parkNanos(1),再自旋判斷。
- 等待策略
名稱 | 措施 | 適用場景 |
---|---|---|
BlockingWaitStrategy | 加鎖 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
BusySpinWaitStrategy | 自旋 | 通過不斷重試,減少切換執行緒導致的系統呼叫,而降低延遲。推薦線上程繫結到固定的CPU的場景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定義策略 | 策略 CPU資源緊缺,吞吐量和延遲並不重要的場景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 效能和CPU資源之間有很好的折中。延遲不均勻 |
TimeoutBlockingWaitStrategy | 加鎖,有超時限制 | CPU資源緊缺,吞吐量和延遲並不重要的場景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 效能和CPU資源之間有很好的折中。延遲比較均勻 |
- 解決偽共享,採用快取行填充
從上圖看到,執行緒1在CPU核心1上讀寫變數X,同時執行緒2在CPU核心2上讀寫變數Y,不幸的是變數X和變數Y在同一個快取行上,每一個執行緒為了對快取行進行讀寫,都要競爭並獲得快取行的讀寫許可權,如果執行緒2在CPU核心2上獲得了對快取行進行讀寫的許可權,那麼執行緒1必須重新整理它的快取後才能在核心1上獲得讀寫許可權,這導致這個快取行在不同的執行緒間多次通過L3快取來交換最新的拷貝資料,這極大的影響了多核心CPU的效能。
下面程式碼解決偽共享問題的,就是例項變數前後各加7個long形變數,用空間換時間。
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;
SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
//..省略
}
Java中通過填充快取行,來解決偽共享問題的思路,現在可能已經是老生常談,連Java8中都新增了sun.misc.Contended註解來避免偽共享問題。但在Disruptor剛出道那會兒,用快取行來優化Java資料結構,這恐怕還很新潮。
-
還有一些細節性的
1)通過sequence & (bufferSize - 1)定位元素的index比普通的求餘取模(%)要快得多。sequence >>> indexShift 快速計算出sequence/bufferSize的商flag(其實相當於當前sequence在環形跑道上跑了幾圈,在資料生產時要設定好flag。2)合理使用Unsafe,CPU級別指令。實現更加高效地記憶體管理和原子訪問。
至於一些更細節的,下面原始碼搞起來,還是很簡單的。
3. 高效能之道-核心-使用單執行緒寫-系統級別記憶體屏障實現-消除偽共享-序號柵欄機制
- disruptor的RingBuffer,之所以可以做到完全無鎖,也是因為“單執行緒寫”,這是所有“前提的前提”。
- 離開了這個前提條件,沒有任何技術可以做到完全無鎖。
- redis、netty(序列,並行有多執行緒競爭問題,buffer池)等等高效能技術框架的設計都是這個核心思想。
- 要正確的實現無鎖,還需要另外一個關鍵技術:記憶體屏障
- 對應到java語言,就是valotile變數與happens before 語義。
- 記憶體屏障 -linux的smp_wmb()/smp_rmb()。
- 快取系統中以快取行為單位儲存的
- 快取行是2的整數冪個連續位元組,一般為32-256個位元組
- 最常見的快取行大小是64個位元組
- **當多執行緒修改互相獨立的變數時,如果這些變數共享同一個快取行 **
- **就會無意中影響批次的效能,這就是偽共享。(左邊填充七個long快取行、右邊填充7個long快取行,以空間換時間實現偽共享,減少資源競爭) **
- disruptor3.0中,序號柵欄 sequenceBarrier和序號sequence搭配使用
- 協調和管理消費者與生產者的工作節奏,避免了鎖和CAS的使用
- 消費者序號小於生產者序號數值
- 消費者序號數值必須小於前置(依賴關係)消費者的序號數值
- 生產者號數值不能大於消費者中最小的序號數值
- 以避免生產者速度過快,將還未來及消費的訊息覆蓋。
@Override
public long next(int n) //1
{
if (n < 1) //初始值 sequence= -1
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue; //語義級別的: nextvalue 為singleProducerSequencer的變數
long nextSequence = nextValue + n; //0
long wrapPoint = nextSequence - bufferSize; // -10 wrapPoint用於判斷當前的序號有沒有繞過整個ringbuffer容器
//這個cachedGatingSequence 他的目的就是不用每次都去進行獲取消費者的最小序號,用一個快取區進行接收
long cachedGatingSequence = this.cachedValue; //-1 cachedValue我不太清楚 但是語義上說是進行快取優化的
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence; //最小序號
// 如果你的生產者序號大於消費者中最小的序號 那麼你就掛起 進行自旋操作
//生產者號數值不能大於消費者中最小的序號數值
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) //自旋操作 自旋鎖 避免加鎖 與cas操作
// Util.getMinimumSequence(gatingSequences, nextValue) 找到消費者中最小的序號值
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}
4. Disruptor原始碼解讀
- 核心類介面
Disruptor 提供了對RingBuffer的封裝。
RingBuffer 環形佇列,基於陣列實現,記憶體被迴圈使用,減少了記憶體分配、回收擴容等操作。
EventProcessor 事件處理器,實現了Runnable,單執行緒批量處理BatchEventProcessor和多執行緒處理WorkProcessor。
Sequencer 生產者訪問序列的介面,RingBuffer生產者的父介面,其直接實現有SingleProducerSequencer和MultiProducerSequencer。
EventSequencer 空介面,暫時沒用,用於以後擴充套件。
SequenceBarrier 消費者屏障 消費者用於訪問快取的控制器。
WaitStrategy 當沒有可消費的事件時,根據特定的實現進行等待。
SingleProducerSequencer 單生產者釋出實現類
MultiProducerSequencer 多生產者釋出實現類