1. 程式人生 > >RocketMQ高併發讀寫

RocketMQ高併發讀寫

RocketMQ的併發讀寫能力扛住了2016年雙十一,每秒17.5萬筆訂單的建立(單筆訂單衍生出N條訊息,實際tps是17.5*n 萬),下面對其高併發讀寫原理進行探討。主要體現在兩方面:客戶端收發訊息,伺服器接收訊息並持久化(重點)。

客戶端(RocketMQ-client)

1,客戶端傳送訊息有負載均衡,客戶端記憶體中儲存著當前所有的伺服器列表,每次傳送都切換一臺伺服器傳送訊息,使得每臺伺服器接收的訊息量儘量均衡,避免熱點問題。
2,傳送程式碼為執行緒安全,當Producer例項就緒之後,完全可以死迴圈傳送訊息。一般業務方都會有N個數據源例項,所以從資料來源方面就保證高併發寫能力。

3,消費者端負載均衡叢集消費模式下,同一個ID的所有消費者例項平均消費該Topic的所有佇列。

伺服器端(Broker)

服務端的高併發讀寫主要利用Linux作業系統的PageCache特性,通過Java的MappedByteBuffer直接操作PageCache。MappedByteBuffer能直接將檔案直接對映到記憶體,其實就是Map把檔案的內容被映像到計算機虛擬記憶體的一塊區域,這樣就可以直接操作記憶體當中的資料而無需操作的時候每次都通過I/O去物理硬碟寫檔案的。

這裡先介紹RocketMQ的訊息儲存結構:由commitLogconsume queue 兩部分組成。

commitLog

1,commitLog是儲存訊息元資料的地方,所有訊息到達Broker後都會儲存到commitLog檔案。
這裡需要強調的是所有topic的訊息都會統一儲存在commitLog中,舉個例子:當前叢集有TopicA, TopicB,這兩個Toipc的訊息會按照訊息到達的先後順序儲存到同一個commitLog中,而不是每個Topic有自己獨立的commitLog。
2,每個commitLog大小上限為1G,滿1G之後會自動新建CommitLog檔案做儲存資料用。
3,CommitLog的清理機制:

  • 按時間清理,rocketmq預設會清理3天前的commitLog檔案;
  • 按磁碟水位清理:當磁碟使用量到達磁碟容量75%,開始清理最老的commitLog檔案。

4,檔案地址:${user.home}/store/${commitlog}/${fileName}

ConsumerQueue:

1,ConsumerQueue相當於CommitLog的索引檔案,消費者消費時會先從ConsumerQueue中查詢訊息的在commitLog中的offset,再去CommitLog中找元資料。如果某個訊息只在CommitLog中有資料,沒在ConsumerQueue中, 則消費者無法消費,Rocktet的事務訊息就是這個原理。
2,consumequeue的資料結構包含3部分:

  • 訊息在commitLog檔案實際偏移量(commitLogOffset)
  • 訊息大小
  • 訊息tag的雜湊值

3,檔案地址:${user.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

得益於以上的資料結構,MQ在寫資料過程是順序寫盤,讀資料過程是跳躍讀盤(儘量命中PageCache)。

訊息順序寫

在單臺伺服器上,MQ元資料都落在單個檔案上(即commitLog),大量資料IO都在順序寫同一個commitLog,滿1G了再寫新的,真正意義上的順序寫盤,再加上MQ預設是累計4K才強制從PageCache中刷到磁碟(快取),所以高併發寫效能突出。

訊息跳躍讀

MQ讀取訊息依賴系統PageCache,PageCache命中率越高,讀效能越高,Linux平時也會盡量預讀資料,使得應用直接訪問磁碟的概率降低。

當客戶端向Broker拉取訊息時,Broker上系統讀檔案過程如下:

1,檢查要讀的資料是否在上次預讀的cache中;
2,若不在cache,作業系統從磁碟中讀取對應的資料頁,並且系統還會將該資料頁之後的連續幾頁(一般三頁)也一併讀入到cache中,再將應用需要的資料返回給應用。此情況作業系統認為是跳躍讀取,屬於同步預讀。
3,若命中cache,相當於上次快取的內容有效,作業系統認為順序讀盤,則繼續擴大快取的資料範圍,將之前快取的資料頁往後的N頁資料再讀取到cache中,屬於非同步預讀。

系統給cache的定義了一個數據結構,命名為window,window由 當前要讀取的內容 + 預讀取的內容(group)組成。

下面結合下圖舉例說明:

  • a狀態:作業系統等待應用讀請求時的快取狀態。
  • b狀態:客戶端發起讀操作,broker發現所讀資料不在Cache中,即不在前次預讀的group中,則表明檔案訪問不是順序訪問(場景有可能是不消費中間的某部分訊息,直接消費最新的訊息),系統採用同步預讀,直接從磁碟中讀取頁面+快取頁到記憶體。
  • c狀態:客戶端繼續發起讀操作,系統發現所讀資料在Cache中,則表明前次預讀命中,作業系統把預讀group擴大一倍,並讓底層檔案系統讀入group中剩下尚不在Cache中的檔案資料塊,非同步預讀。

所以Broker的機器需要大記憶體,儘量快取足夠多的commitLog,讓Broker讀寫訊息基本在PageCache中操作。在執行時,如果資料量非常大,可以看到broker的程序佔用記憶體比較多,其實大部分是被快取住的commitlog。

快取清理機制(PageCache)

Linux會快取儘量多的訊息資料到記憶體中,提高讀資料緩衝命中率。當記憶體不夠時,還是要清理沒用的資料,將清理的空間用以快取新的內容,這整個過程,Linux用一個雙向連結串列來管理,如下圖:

inactive_list代表訪問冷資料,active_list代表訪問熱資料,新分配的資料頁先鏈入到inactive_list頭部,當其被引用時再將其移到active_list的頭部。

當記憶體不足時,系統首先從尾部開始反向掃描 active_list並將狀態不是referenced的項鍊入到inactive_list的頭部,然後系統反向掃描inactive_list,如果所掃描的項的處於合適的狀態就回收該項,直到回收了足夠數目的Cache項,這就是系統回收記憶體的過程。

這裡需要注意一點,如果記憶體回收速度比應用寫快取的速度慢,會導致寫快取的執行緒一直等待,體現到RocketMQ上就是寫訊息RT很高,這就是 “毛刺問題”。這時就需要結合GC引數和系統核心引數進行調整,此處不對此展開說明了。

demo演示:
git clone https://github.com/apache/rocketmq.git
建立配置檔案conf.properties
rocketmqHome=/Users/javahongxi/github/rocketmq/distribution
namesrvAddr=127.0.0.1:9876
mapedFileSizeCommitLog=52428800
mapedFileSizeConsumeQueue=30000

-c conf.properties
依次啟動NamesrvStartup,BrokerStartup,Consumer,Producer

rocketmq擴充套件:https://github.com/javahongxi/incubator-rocketmq-externals.git

rocketmq擴充套件:https://github.com/javahongxi/incubator-rocketmq-externals.git