1. 程式人生 > 其它 >網站架構核心技disruptor

網站架構核心技disruptor

一 序:
本章業務場景:佇列在資料結構中是一種線性表,從一端插入資料,然後從另一端刪除資料。作者舉例的場景有:進行非同步處理、系統解耦、資料同步、流量削峰、緩衝、限流等。

前面的比較淺,總結起來,核心知識點有兩塊:

1. disruptor+redis佇列

2. 基於canal實現資料異構。

下面說的就是整理其中的系統內部的記憶體式佇列,非kafka那種分散式佇列。

*************************************************

二 blockingqueue
jdk常用的佇列有


佇列 有界性 鎖 資料結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap
考慮到記憶體回收,防止無姐佇列記憶體溢位,通常使用 ArrayBlockingQueue
常用於生產者-消費者模式:

demo:


package com.daojia.web.bm.disruptor;

import java.util.concurrent.BlockingQueue;

public class ThreadForConsumer extends Thread {

private BlockingQueue<String> blockingQueue;

public ThreadForConsumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
String msg;
try {
while (true) {
msg = blockingQueue.take();
if(msg==null)
{
System.out.println("nodata");
Thread.sleep(1);
}else{
// 消費
System.out.println(msg+"over");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}


}


public class QueueMain {

public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
// 初始化阻塞佇列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1024);

// 建立消費者執行緒


Thread consumer = new Thread(new ThreadForConsumer(blockingQueue));
consumer.start();
long t1 =System.currentTimeMillis();
System.out.println("begin="+t1);
// 建立資料
for(int i=0;i<=1000000;i++)
{
blockingQueue.put(i+"");
}
System.out.println("over ,user="+(System.currentTimeMillis()-t1));
}

}

簡化了生產者,沒有單獨放到執行緒去寫,常見的是這種:


public class ThreadForProducer extends Thread {

private BlockingQueue<String> blockingQueue;


public ThreadForProducer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
try {

for(int i=0;i<=1000000;i++)
{
blockingQueue.put(i+"");
}
System.out.println("put orver");
} catch (Exception e) {
e.printStackTrace();
}
}
}

這裡有相關知識點:整理一下:
2.1. 底層實現:加鎖與CAS
看下底層的關鍵方法:put放入資料


public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

獲取資料:take

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

可見阻塞佇列使用鎖,當然為提高效率用通知模式。 就是當生產者往滿的佇列裡新增元素時會阻塞住生產者,當消費者消費了一個佇列中的元素後,會通知生產者當前佇列可用。notempty,notfull就是condition。
CAS是CPU的一個指令,由CPU保證原子性。

常見的例子是juc下面的atmoic的類。

通常情況(競爭不那麼高)cas的效能是比鎖好的,當然高併發情況下,鎖比cas要好。

2.2 CPU偽共享
下圖(來自《深入理解計算機系統》),在此層次中,從上至下,容量越來越大,訪問速度越來越慢,但是造價也更便宜


下圖是個簡化的計算機CPU與快取的示意圖




當CPU執行運算的時候,它先去L1查詢所需的資料、再去L2、然後是L3,如果最後這些快取中都沒有,所需的資料就要去主記憶體拿。走得越遠,運算耗費的時間就越長。所以如果你在做一些很頻繁的事,你要儘量確保資料在L1快取中。

另外,執行緒之間共享一份資料的時候,需要一個執行緒把資料寫回主存,而另一個執行緒訪問主存中相應的資料。

下面是從CPU訪問不同層級資料的時間概念:

從CPU到 大約需要的CPU週期 大約需要的時間
主存 約60-80ns
QPI 匯流排傳輸(between sockets, not drawn) 約20ns
L3 cache 約40-45 cycles 約15ns
L2 cache 約10 cycles 約3ns
L1 cache 約3-4 cycles 約1ns
暫存器 1 cycle
可見CPU讀取主存中的資料會比從L1中讀取慢了近2個數量級。

今天的CPU不再是按位元組訪問記憶體,而是以64位元組(64位系統)為單位的塊(chunk)拿取,稱為一個快取行(cache line)。當你讀一個特定的記憶體地址,整個快取行將從主存換入快取,並且訪問同一個快取行內的其它值的開銷是很小的。

比如,Java中的long型別是8個位元組,因此在一個緩衝行中可以存8個long型別的變數,也就是說如果訪問一個long型別的陣列,訪問第一個元素的時候,會把另外7個也載入到快取中,可以非常快速的遍歷陣列,這也是陣列比連結串列快的原因。

美團的測試程式碼如下:

public class CacheLineEffect {

//考慮一般快取行大小是64位元組,一個 long 型別佔8位元組
static long[][] arr;

public static void main(String[] args) {
arr = new long[1024 * 1024][];
for (int i = 0; i < 1024 * 1024; i++) {
arr[i] = new long[8];
for (int j = 0; j < 8; j++) {
arr[i][j] = 0L;
}
}
long sum = 0L;
long marked = System.currentTimeMillis();
for (int i = 0; i < 1024 * 1024; i+=1) {
for(int j =0; j< 8;j++){
sum = arr[i][j];
}
}
System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");

marked = System.currentTimeMillis();
for (int i = 0; i < 8; i+=1) {
for(int j =0; j< 1024 * 1024;j++){
sum = arr[j][i];
}
}
System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
}
}


執行結果:
Loop times:13ms
Loop times:54ms

快取行利用區域性性的確能提高效率,但是有一個弊端,當我們的資料不相關,只是一個單獨的變數,這兩個資料在一個快取行中,而且他們的訪問頻率都很高,這時候反而會影響效率。如下圖:



比如我們有一個類存放了兩個變數的值data1,data2。當載入data1的時候,data2也被載入到快取中,也就是存在於同一個快取行。當core1改變data1的值的時候,core1快取中的值和記憶體中的值都被改變了,這時候core2也會重新載入這個快取行,因為data1變了,而core2只是想讀取自己快取中的data2,卻任然要等從記憶體中重新載入這個快取行。

這種無法充分使用快取行特性的現象,稱為偽共享。

ArrayBlockingQueue有三個成員變數:

takeIndex:需要被取走的元素下標
putIndex:可被元素插入的位置的下標
count:佇列中元素的數量
這三個變數很容易放到一個快取行中,但是之間修改沒有太多的關聯。所以每次修改,都會使之前快取的資料失效,從而不能完全達到共享的效果。

好了,上面是說了相關缺點,那麼disruptor是怎麼實現的呢?
三 disruptor

Disruptor是由LMAX公司開發的一款高效無鎖記憶體佇列。使用無鎖方式實現了一個環形佇列代替線性佇列。相對於普通的線性佇列,環形佇列不需要維護頭尾兩個指標,只需維護一個當前位置就可以完成出入隊操作。受限於環形結構,佇列的大小隻能初始化時指定,不能動態擴充套件。

如下圖所示,Disruptor的實現為一個迴圈佇列,ringbuffer擁有一個序號(Seq),這個序號指向陣列中下一個可用的元素


相關設計上的知識點:


1)Disruptor要求陣列大小設定為2的N次方。這樣可以通過Seq & (QueueSize - 1) 直接獲取,其效率要比取模快得多。這是因為(Queue - 1)的二進位制為全1等形式。例如,上圖中QueueSize大小為8,Seq為10,則只需要計算二進位制1010 & 0111 = 2,可直接得到index=2位置的元素。

2)在RingBuffer中,生產者向陣列中寫入資料,生產者寫入資料時,使用CAS操作。消費者從中讀取資料時,為防止多個消費者同時處理一個數據,也使用CAS操作進行資料保護。
3)這種固定大小的RingBuffer還有一個好處是,可以記憶體複用。不會有新空間需要分配或者舊的空間回收,當陣列填充滿後,再寫入資料會將資料覆蓋。

4)增加快取行補齊, 提升cache快取命中率

需要去看程式碼。
四 demo:
/**
* 事件物件:
* 只模擬放一條訊息
* @author daojia
*
*/
public class MsgData {

private String msg;

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

/**
* 工廠類:構造緩衝區物件例項
* @author daojia
*
*/
public class MsgDataFactory implements EventFactory<MsgData> {

@Override
public MsgData newInstance() {
// TODO Auto-generated method stub
return new MsgData();
}

}

/**
* 消費者
* @author daojia
*
*/
public class MsgDataHandler implements WorkHandler<MsgData> {

@Override
public void onEvent(MsgData event) throws Exception {
// TODO Auto-generated method stub
String msg = event.getMsg();
//模擬業務呼叫
System.out.println(msg+"over");
Thread.sleep(10);
}
}

/**
* 生產者:
*
* @author daojia
*
*/
public class MsgDataProducer {

private final RingBuffer<MsgData> ringBuffer;

public MsgDataProducer(RingBuffer<MsgData> ringBuffer){
this.ringBuffer = ringBuffer;
}


public void pushData(String msg) {
//可以把ringBuffer看做一個事件佇列,那麼next就是得到下面一個事件槽
long seq = ringBuffer.next();
try {
// 獲取可用位置
MsgData event = ringBuffer.get(seq);
// 填充可用位置
event.setMsg(msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
//釋出事件, 通知消費者
ringBuffer.publish(seq);
}
}
}


public class DisruptorMain {

public static void main(String[] args) {
// TODO Auto-generated method stub

// 工廠
MsgDataFactory factory = new MsgDataFactory();
// 執行緒池
ExecutorService executor = Executors.newCachedThreadPool();
int bufferSize = 1024; // 必須為2的冪指數

// 初始化Disruptor
Disruptor<MsgData> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE,new YieldingWaitStrategy());
// 啟動消費者
disruptor.handleEventsWithWorkerPool(new MsgDataHandler(),new MsgDataHandler(),new MsgDataHandler());
disruptor.start();
//獲取ringBuffer
RingBuffer<MsgData> ringBuffer = disruptor.getRingBuffer();
long t1 =System.currentTimeMillis();
System.out.println("begin="+t1);
// 啟動生產者
MsgDataProducer producer = new MsgDataProducer(ringBuffer);
for(int i=0;i<=100000;i++)
{
//模擬生成資料
producer.pushData(i+"");
}

System.out.println("over ,user="+(System.currentTimeMillis()-t1));


}

}


在單個消費者,單個生產者,設定長度都是1024的 情況下,本地測試的效能跟blockingqueue沒啥區別。
不知道是不是我設定的引數不對,沒體現出設計的優勢。
單純的100W資料迴圈放入,耗時在10s左右。