1. 程式人生 > 其它 >RocketMQ:(7) 主從同步(HA)機制

RocketMQ:(7) 主從同步(HA)機制

一、RocketMQ主從複製原理

  為了提高訊息消費的高可用性,避免Broker發生單點故障引起儲存在Broker上的訊息無法及時消費,RocketMQ引入了Broker主備機制,即訊息消費到達主伺服器後需要將訊息同步到訊息從伺服器,如果主伺服器Broker宕機後,訊息消費者可以從從伺服器拉取訊息。

HAService 整體工作機制

RocketMQ HA 的實現原理如下 。
1 )主伺服器啟動,並在特定埠上監聽從伺服器的連線。
2 )從伺服器主動連線主伺服器,主伺服器接收客戶端的連線,並建立相關TCP連線。
3 )從伺服器主動向主伺服器傳送待拉取訊息偏移量,主伺服器解析請求並返回訊息給從伺服器 。


4 )從伺服器儲存訊息並繼續傳送新的訊息同步請求 。

RocketMQ HA 7個核心類實現

1 ) HAService: RocketMQ 主從同步核心實現類 。
2 ) HAService$AcceptSocketService:HA Master 端監昕客戶端連線實現類。實現Master 端監聽Slave連線。
3 ) HAService$GroupTransferService:主從同步通知實現類。

  GroupTransferService的職責是負責當主從同步複製結束後通知由於等待 HA 同步結果而阻塞的訊息傳送者執行緒。判斷主從同步是否完成的依據是 Slave 中已成功複製的最大偏移量是否大於等於訊息生產者傳送訊息後訊息服務端返回下一條訊息的起始偏移量,如果是則表示主從同步複製已經完成,喚醒訊息傳送執行緒,否則等待 1s 再次判斷,每一個任務在一批任務中迴圈判斷 5 次。訊息傳送者返回有兩種情況:等待超過5s或GroupTransferService通知主從複製完成 。

4 ) HAService$HAClient: HAClient是主從同步Slave端的核心實現類。

  Step1:Slave 伺服器連線 Master 伺服器。如果 socketChannel 為空, 則嘗試連線Master,建立到Master的TCP連線。在Broker啟動時,如果 Broker角色為SLAVE時將讀取Broker配置檔案中的haMasterAddress屬性並更新 HAClient 的 masterAddrees,如果角色為 SLAVE 並且 haMasterAddress 為空,啟動並不會報錯,但不會執行主從同步複製,該方法最終返回是否成功連線上Master。
  Step2:判斷是否需要向Master反饋當前待拉取偏移量,Master與Slave的 HA 心跳傳送間隔預設為5S。


  Step3:向 Master 伺服器反饋拉取偏移量 。這裡有兩重意義,對於 Slave 端來說,是傳送下次待拉取訊息偏移量,而對於 Master 服務端來說,既可以認為是 Slave 本次請求拉取的訊息偏移量,也可以理解為 Slave 的訊息同步 ACK 確認訊息。
  Step4:進行事件選擇,其執行間隔為 1s 。
  Step5:處理網路讀請求,即處理從 Master 伺服器傳回的訊息資料。

5 ) HAConnection:HA Master 服務端 HA 連線物件的封裝,與 Broker 從伺服器的網路讀寫實現類 。Master 伺服器在收到從伺服器的連線請求後,會將主從伺服器的連線 SocketChannel 封裝成 HAConnection 物件,實現主伺服器與從伺服器的讀寫操作 。

6 ) HAConnection$ReadSocketService:HA Master 網路讀實現類 。
7 ) HAConnection$WriteSocketServicce:HA Master 網路寫實現類 。

二、RocketMQ 讀寫分離機制

  RocketMQ 根據 MessageQueue查詢 Broker 地址的唯一依據是 brokerName,從 RocketMQ 的 Broker 組織結構中得知同一組 Broker ( M-S )伺服器,它們的 brokerName 相同但 brokerId 不同,主伺服器的 brokerId 為 0,從伺服器的 brokerId 大於 0。
  訊息消費拉取執行緒PullMessageService根據PullRequest請求從主伺服器拉取訊息後會返回下一次建議拉取的brokerId,訊息消費者執行緒在收到訊息後,會根據主伺服器的建議拉取brokerId來更新pullFromWhichNodeTable,pullFromWhichNodeTable快取表中儲存該訊息佇列的brokerId。

訊息服務端是根據何種規則來建議哪個訊息消費佇列該從哪臺 Broker 伺服器上拉取訊息呢?

1 ) maxOffsetPy:代表當前主伺服器訊息儲存檔案最大偏移量。
2 ) maxPhyOffsetPulling:此次拉取訊息最大偏移量 。
3 ) diff:對於 PullMessageService 執行緒來說,當前未被拉取到訊息消費端的訊息長度。
4 ) TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ 所在伺服器總記憶體大小 。accessMessagelnMemoryMaxRatio 表示 RocketMQ 所能使用的最大記憶體比例,超過該記憶體,訊息將被置換出記憶體;memory 表示 RocketMQ 訊息常駐記憶體的大小,超過該大小,RocketMQ會將舊的訊息置換回磁碟。
5 )如果 diff 大於 memory,表示當前需要拉取的訊息已經超出了常駐記憶體的大小,表示主伺服器繁忙,此時才建議從從伺服器拉取。
  如果主伺服器繁忙則建議下一次從從伺服器拉取訊息,如果一個 Master 擁有多臺Slave伺服器,參與訊息拉取負載的從伺服器只會是其中一個。

三、總結

1)RocketMQ 的 HA 機制,其核心實現是從伺服器在啟動的時候主動向主伺服器建立 TCP長連線,然後獲取伺服器的 commitlog 最大偏移量,以此偏移量向主伺服器主動拉取訊息,主伺服器根據偏移量,與自身 commitlog 檔案的最大偏移量進行比較,如果大於從伺服器的 commitlog 偏移量,主伺服器將向從伺服器返回一定數量的訊息,該過程迴圈進行,達到主從伺服器資料同步。

2)RocketMQ 讀寫分離與其他中介軟體的實現方式完全不同,RocketMQ 是消費者首先向主伺服器發起拉取訊息請求,然後主伺服器返回一批訊息,然後會根據主伺服器負載壓力與主從同步情況,向訊息消費者建議下次訊息拉取是從主伺服器還是從從伺服器拉取。