1. 程式人生 > >Java併發程式設計高階技術、高效能併發框架Disruptor

Java併發程式設計高階技術、高效能併發框架Disruptor

1.簡介

Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個執行緒裡每秒處理6百萬訂單。業務邏輯處理器完全是執行在記憶體中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。Disruptor它是一個開源的併發框架,並獲得2011 Duke’s 程式框架創新獎,能夠在無鎖的情況下實現網路的Queue併發操作。Disruptor是一個高效能的非同步處理框架,或者可以認為是最快的訊息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。
開源:

https://github.com/LMAX-Exchange/disruptor

2.使用

  1. 目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本效能更加的優秀,提供更多的API使用方式,直接下載jar包,addTopath就可以使用了。
  2. 也可以以maven的方式來引入
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>
  1. 示例
    下面以一個例子來使用disruptor框架:
    先了解一下步驟:
    1. 建立一個Event類,用於建立Event類例項物件
 import java.text.SimpleDateFormat;
public class OrderEvent {

   private long value; //訂單的價格

   public long getValue() {
   	return value;
   }

   public void setValue(long value) {
   	this.value = value;
   }
}
  1. 需要有一個監聽事件類,用於處理資料(Event類)
  import java.text.SimpleDateFormat;
public class OrderEvent {

	private long value; //訂單的價格

	public long getValue() {
		return value;
	}

	public void setValue(long value) {
		this.value = value;
	}
}
  1. 例項化Disruptor例項,配置引數,編寫Disruptor
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {

	
	
	public static void main(String[] args) {
		
		
		// 引數準備工作
		OrderEventFactory orderEventFactory = new OrderEventFactory();
		int ringBufferSize = 1024*1024;
		ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		/**
		 * 建立OrderEvent類
		 * 1 eventFactory: 訊息(event)工廠物件  實現   implements EventFactory<OrderEvent>介面
		 * OrderEventHandler implements EventHandler<OrderEvent>  用來接受資料
		 * 2 ringBufferSize: 容器的長度
		 * 3 executor: 執行緒池(建議使用自定義執行緒池) RejectedExecutionHandler
		 * 4 ProducerType: 單生產者 還是 多生產者
		 * 5 waitStrategy: 等待策略
		 */
		//1. 例項化disruptor物件
		Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
				ringBufferSize,   //容器大小
				executor,          //執行緒池
				ProducerType.SINGLE,   //SINGLE\MULTI 單生產者模式和 多生產者模式       
				new BlockingWaitStrategy());   //等待策略  阻塞策略
		
		//2. 新增消費者的監聽 (構建disruptor 與 消費者的一個關聯關係)
		disruptor.handleEventsWith(new OrderEventHandler());
		
		//3. 啟動disruptor
		disruptor.start();
		
		//4. 獲取實際儲存資料的容器: RingBuffer
		RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
		
		
		//訂單生產者
		OrderEventProducer producer = new OrderEventProducer(ringBuffer);
		
		ByteBuffer bb = ByteBuffer.allocate(8);
		
		for(long i = 0 ; i < 100; i ++){
			bb.putLong(0, i);
			producer.sendData(bb);
		}
		
		disruptor.shutdown();
		executor.shutdown();
		
	}
}

  1. 編寫生產者元件,向Disruptor容器中去投遞資料
import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {

	private RingBuffer<OrderEvent> ringBuffer;
	
	public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}
	
	public void sendData(ByteBuffer data) {
		//1 在生產者傳送訊息的時候, 首先 需要從我們的ringBuffer裡面 獲取一個可用的序號
		long sequence = ringBuffer.next();	//0	
		try {
			//2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent物件是一個沒有被賦值的"空物件"
			OrderEvent event = ringBuffer.get(sequence);
			//3 進行實際的賦值處理
			event.setValue(data.getLong(0));			
		} finally {
			//4 提交發布操作
			ringBuffer.publish(sequence);			
		}
	}

	
}

3.Disruptor為什麼這麼快

  1. 環形佇列RingBuffer
    一個環形佇列,意味著首尾相連,對列可以迴圈使用,使用陣列來儲存。環形佇列在JVM生命週期中通常是永生的,GC的壓力更小。
    在這裡插入圖片描述
    我們來解釋一下這個圖:當前有一個consumer,停留在位置12,這時producer假設在位置3,這時producer的下一步是如何處理的呢?producer會嘗試讀取4,發現下一個可以獲取,所以可以安全獲取,並且通知一個阻塞的consumer起來活動。如此一直到下一圈11都是安全的(這裡我們假設生產者比較快),當producer嘗試訪問12時發現不能繼續,於是自旋等待;當consumer消費時,會呼叫barrier的waitFor方法,waitFor看到前面最近的安全節點已經到了下一圈的11,於是consumer可以無鎖的去消費當前12到下一圈11所有資料,可以想象,這種方式比起synchronized要快上很多倍。
  2. 棄用鎖機制使用CAS
    在高度競爭的情況下,鎖的效能將超過原子變數的效能,但是更真實的競爭情況下,原子變數的效能將超過鎖的效能。同時原子變數不會有死鎖等活躍性問題。能不用鎖,就不使用鎖,如果使用,也要將鎖的粒度最小化。

唯一使用鎖的就是消費者的等待策略實現類中,下圖。補充一句,生產者的等到策略就是LockSupport.parkNanos(1),再自旋判斷。

  1. 等待策略
名稱 措施 適用場景
BlockingWaitStrategy 加鎖 CPU資源緊缺,吞吐量和延遲並不重要的場景
BusySpinWaitStrategy 自旋 通過不斷重試,減少切換執行緒導致的系統呼叫,而降低延遲。推薦線上程繫結到固定的CPU的場景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定義策略 策略 CPU資源緊缺,吞吐量和延遲並不重要的場景
SleepingWaitStrategy 自旋 + yield + sleep 效能和CPU資源之間有很好的折中。延遲不均勻
TimeoutBlockingWaitStrategy 加鎖,有超時限制 CPU資源緊缺,吞吐量和延遲並不重要的場景
YieldingWaitStrategy 自旋 + yield + 自旋 效能和CPU資源之間有很好的折中。延遲比較均勻
  1. 解決偽共享,採用快取行填充
    在這裡插入圖片描述
    從上圖看到,執行緒1在CPU核心1上讀寫變數X,同時執行緒2在CPU核心2上讀寫變數Y,不幸的是變數X和變數Y在同一個快取行上,每一個執行緒為了對快取行進行讀寫,都要競爭並獲得快取行的讀寫許可權,如果執行緒2在CPU核心2上獲得了對快取行進行讀寫的許可權,那麼執行緒1必須重新整理它的快取後才能在核心1上獲得讀寫許可權,這導致這個快取行在不同的執行緒間多次通過L3快取來交換最新的拷貝資料,這極大的影響了多核心CPU的效能。
    下面程式碼解決偽共享問題的,就是例項變數前後各加7個long形變數,用空間換時間。
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;
    //..省略
}

Java中通過填充快取行,來解決偽共享問題的思路,現在可能已經是老生常談,連Java8中都新增了sun.misc.Contended註解來避免偽共享問題。但在Disruptor剛出道那會兒,用快取行來優化Java資料結構,這恐怕還很新潮。

  1. 還有一些細節性的
    1)通過sequence & (bufferSize - 1)定位元素的index比普通的求餘取模(%)要快得多。sequence >>> indexShift 快速計算出sequence/bufferSize的商flag(其實相當於當前sequence在環形跑道上跑了幾圈,在資料生產時要設定好flag。

    2)合理使用Unsafe,CPU級別指令。實現更加高效地記憶體管理和原子訪問。
    至於一些更細節的,下面原始碼搞起來,還是很簡單的。

3. 高效能之道-核心-使用單執行緒寫-系統級別記憶體屏障實現-消除偽共享-序號柵欄機制

  1. disruptor的RingBuffer,之所以可以做到完全無鎖,也是因為“單執行緒寫”,這是所有“前提的前提”。
  2. 離開了這個前提條件,沒有任何技術可以做到完全無鎖。
  3. redis、netty(序列,並行有多執行緒競爭問題,buffer池)等等高效能技術框架的設計都是這個核心思想。
  4. 要正確的實現無鎖,還需要另外一個關鍵技術:記憶體屏障
  5. 對應到java語言,就是valotile變數與happens before 語義。
  6. 記憶體屏障 -linux的smp_wmb()/smp_rmb()。
  7. 快取系統中以快取行為單位儲存的
  8. 快取行是2的整數冪個連續位元組,一般為32-256個位元組
  9. 最常見的快取行大小是64個位元組
  10. **當多執行緒修改互相獨立的變數時,如果這些變數共享同一個快取行 **
  11. **就會無意中影響批次的效能,這就是偽共享。(左邊填充七個long快取行、右邊填充7個long快取行,以空間換時間實現偽共享,減少資源競爭) **
  12. disruptor3.0中,序號柵欄 sequenceBarrier和序號sequence搭配使用
  13. 協調和管理消費者與生產者的工作節奏,避免了鎖和CAS的使用
  14. 消費者序號小於生產者序號數值
  15. 消費者序號數值必須小於前置(依賴關係)消費者的序號數值
  16. 生產者號數值不能大於消費者中最小的序號數值
  17. 以避免生產者速度過快,將還未來及消費的訊息覆蓋。
@Override
    public long next(int n)   //1
    {
        if (n < 1)     //初始值 sequence= -1
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;   //語義級別的: nextvalue 為singleProducerSequencer的變數

        long nextSequence = nextValue + n;   //0
        long wrapPoint = nextSequence - bufferSize;  //  -10 wrapPoint用於判斷當前的序號有沒有繞過整個ringbuffer容器
        //這個cachedGatingSequence 他的目的就是不用每次都去進行獲取消費者的最小序號,用一個快取區進行接收
        long cachedGatingSequence = this.cachedValue;   //-1   cachedValue我不太清楚   但是語義上說是進行快取優化的 

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            long minSequence;  //最小序號
          // 如果你的生產者序號大於消費者中最小的序號  那麼你就掛起 進行自旋操作
          //生產者號數值不能大於消費者中最小的序號數值
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))   //自旋操作  自旋鎖     避免加鎖  與cas操作
            // Util.getMinimumSequence(gatingSequences, nextValue)  找到消費者中最小的序號值
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

4. Disruptor原始碼解讀

  1. 核心類介面
    Disruptor 提供了對RingBuffer的封裝。
    RingBuffer 環形佇列,基於陣列實現,記憶體被迴圈使用,減少了記憶體分配、回收擴容等操作。
    EventProcessor 事件處理器,實現了Runnable,單執行緒批量處理BatchEventProcessor和多執行緒處理WorkProcessor。
    Sequencer 生產者訪問序列的介面,RingBuffer生產者的父介面,其直接實現有SingleProducerSequencer和MultiProducerSequencer。
    EventSequencer 空介面,暫時沒用,用於以後擴充套件。
    SequenceBarrier 消費者屏障 消費者用於訪問快取的控制器。
    WaitStrategy 當沒有可消費的事件時,根據特定的實現進行等待。
    SingleProducerSequencer 單生產者釋出實現類
    MultiProducerSequencer 多生產者釋出實現類

參考:
https://www.cnblogs.com/lewis09/p/9974995.html