RocketMQ原理解析-broker 4.HA & master slave
在broker啟動的時候BrokerController如果是slave,配置了master地址更新,沒有配置所有broker會想namesrv註冊,從namesrv獲取haServerAddr,然後更新到HAClient
當HAClient的MasterAddress不為空的時候(因為broker master和slave都構建了HAClient)會主動連線master獲取SocketChannel
Master監聽Slave請求的埠,預設為服務埠+1
接收slave上傳的offset long型別
int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8) //沒有理解意圖
long readOffset =this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
主從複製從哪裡開始複製:如果請求時0 ,從最後一個檔案開始複製
Slave啟動的時候brokerController開啟定時任務定時拷貝master的配置資訊
SlaveSynchronize類代表slave從master同步資訊(非訊息)
syncTopicConfig 同步topic的配置資訊
syncConsumerOffset 同步消費進度
syncDelayOffset 同步定時進度
syncSubcriptionGroupConfig 同步訂閱組配7F6E
HaService類實現了HA服務,負責同步雙寫,非同步複製功能, 這個類master和slave的broker都會例項化,
Master通過AcceptSocketService監聽slave的連線,每個masterslave連線都會構建一個HAConnection物件搭建他們之間的橋樑,對於一個master多slave部署結構的會有多個HAConnection例項,
Master構建HAConnection時會構建向slave寫入資料服務執行緒物件WriteSocketService物件和讀取Slave反饋服務執行緒物件ReadSocketService
WriteSocketService
向slave同步commitLog資料執行緒,
slaveRequestOffset是每次slave同步完資料都會向master傳送一個ack表示下次同步的資料的offset。
如果slave是第一次啟動的話slaveRequestOffset=0, master會從最近那個commitLog檔案開始同步。(如果要把master上的所有commitLog檔案同步到slave的話, 把masterOffset值賦為minOffset)
向socket寫入同步資料: 傳輸資料協議<Phy Offset> <Body Size> <Body Data>
ReadSocketService:
讀取slave通過HAClient向master返回同步commitLog的物理偏移量phyOffset值
通知前端執行緒,如果是同步複製的話通知是否複製成功
Slave 通過HAClient建立與master的連線,
來定時彙報slave最大物理offset,預設5秒彙報一次也代表了跟master之間的心跳檢測
讀取master向slave寫入commitlog的資料, master向slave寫入資料的格式是
Slave初始化DefaultMessageStore時候會構建ReputMessageService服務執行緒並在啟動儲存服務的start方法中被啟動
ReputMessageService的作用是slave從物理佇列(由commitlog檔案構成的MapedFileQueue)載入資料,並分發到各個邏輯佇列
HA同步複製, 當msg寫入master的commitlog檔案後,判斷maser的角色如果是同步雙寫SYNC_MASTER, 等待master同步到slave在返回結果