1. 程式人生 > >RocketMQ 主從同步若干問題答疑

RocketMQ 主從同步若干問題答疑

目錄

  • 1、初識主從同步
  • 2、提出問題
  • 3、原理探究
    • 3.1 RocketMQ主從讀寫分離機制
    • 3.2 訊息消費進度同步機制
  • 4、總結

溫馨提示:建議參考程式碼RocketMQ4.4版本,4.5版本引入了多副本機制,實現了主從自動切換,本文並不關心主從切換功能。

@(本節目錄)

1、初識主從同步

主從同步基本實現過程如下圖所示:

RocketMQ 的主從同步機制如下:
A. 首先啟動Master並在指定埠監聽;
B. 客戶端啟動,主動連線Master,建立TCP連線;
C. 客戶端以每隔5s的間隔時間向服務端拉取訊息,如果是第一次拉取的話,先獲取本地commitlog檔案中最大的偏移量,以該偏移量向服務端拉取訊息;
D. 服務端解析請求,並返回一批資料給客戶端;
E. 客戶端收到一批訊息後,將訊息寫入本地commitlog檔案中,然後向Master彙報拉取進度,並更新下一次待拉取偏移量;
F. 然後重複第3步;

RocketMQ主從同步一個重要的特徵:主從同步不具備主從切換功能,即當主節點宕機後,從不會接管訊息傳送,但可以提供訊息讀取。

溫馨提示:本文並不會詳細分析RocketMQ主從同步的實現細節,如大家對其感興趣,可以查閱筆者所著的《RocketMQ技術內幕》或檢視筆者博文:https://blog.csdn.net/prestigeding/article/details/79600792

2、提出問題

  • 主,從伺服器都在執行過程中,訊息消費者是從主拉取訊息還是從從拉取?
  • RocketMQ主從同步架構中,如果主伺服器宕機,從伺服器會接管訊息消費,此時訊息消費進度如何保持,當主伺服器恢復後,訊息消費者是從主拉取訊息還是從從伺服器拉取,主從伺服器之間的訊息消費進度如何同步?

接下來帶著上述問題,一起來探究其實現原理。

3、原理探究

3.1 RocketMQ主從讀寫分離機制

RocketMQ的主從同步,在預設情況下RocketMQ會優先選擇從主伺服器進行拉取訊息,並不是通常意義的上的讀寫分離,那什麼時候會從拉取呢?

溫馨提示:本節同樣不會詳細整個流程,只會點出其關鍵點,如果想詳細瞭解訊息拉取、訊息消費等核心流程,建議大家查閱筆者所著的《RocketMQ技術內幕》。

在RocketMQ中判斷是從主拉取,還是從從拉取的核心程式碼如下:
DefaultMessageStore#getMessage

long diff = maxOffsetPy - maxPhyOffsetPulling;  // @1
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));  // @2
getResult.setSuggestPullingFromSlave(diff > memory);   // @3

程式碼@1:首先介紹一下幾個區域性變數的含義:

  • maxOffsetPy
    當前最大的物理偏移量。返回的偏移量為已存入到作業系統的PageCache中的內容。
  • maxPhyOffsetPulling
    本次訊息拉取最大物理偏移量,按照訊息順序拉取的基本原則,可以基本預測下次開始拉取的物理偏移量將大於該值,並且就在其附近。
  • diff
    maxOffsetPy與maxPhyOffsetPulling之間的間隔,getMessage通常用於訊息消費時,即這個間隔可以理解為目前未處理的訊息總大小。

程式碼@2:獲取RocketMQ訊息儲存在PageCache中的總大小,如果當RocketMQ容量超過該闊值,將會將被置換出記憶體,如果要訪問不在PageCache中的訊息,則需要從磁碟讀取。

  • StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    返回當前系統的總實體記憶體。引數
  • accessMessageInMemoryMaxRatio
    設定訊息儲存在記憶體中的閥值,預設為40。
    結合程式碼@2這兩個引數的含義,算出RocketMQ訊息能對映到記憶體中最大值為40% * (機器實體記憶體)。

程式碼@3:設定下次拉起是否從從拉取標記,觸發下次從從伺服器拉取的條件為:當前所有可用訊息資料(所有commitlog)檔案的大小已經超過了其闊值,預設為實體記憶體的40%。

那GetResult的suggestPullingFromSlave屬性在哪裡使用呢?

PullMessageProcessor#processRequest

if (getMessageResult.isSuggestPullingFromSlave()) {      // @1
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
       responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {      // @2
       case ASYNC_MASTER:
       case SYNC_MASTER:
               break;
       case SLAVE:
               if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
               }
              break;
 } 
 
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // @3
            // consume too slow ,redirect to another machine
            if (getMessageResult.isSuggestPullingFromSlave()) {
                 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            }
           // consume ok
           else {
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
           }
     } else {
           responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
     }

程式碼@1:如果從commitlog檔案查詢訊息時,發現訊息堆積太多,預設超過實體記憶體的40%後,會建議從從伺服器讀取。

程式碼@2:如果當前伺服器的角色為從伺服器:並且slaveReadEnable=true,則忽略程式碼@1設定的值,下次拉取切換為從主拉取。

程式碼@3:如果slaveReadEnable=true(從允許讀),並且建議從從伺服器讀取,則從訊息消費組建議當訊息消費緩慢時建議的拉取brokerId,由訂閱組配置屬性whichBrokerWhenConsumeSlowly決定;如果訊息消費速度正常,則使用訂閱組建議的brokerId拉取訊息進行消費,預設為主伺服器。如果不允許從可讀,則固定使用從主拉取。

溫馨提示:請注意broker服務引數slaveReadEnable,與訂閱組配置資訊:whichBrokerWhenConsumeSlowly、brokerId的值,在生產環境中,可以通過updateSubGroup命令動態改變訂閱組的配置資訊。

如果訂閱組的配置保持預設值的話,拉取訊息請求傳送到從伺服器後,下一次訊息拉取,無論是否開啟slaveReadEnable,下一次拉取,還是會發往主伺服器。

上面的步驟,在訊息拉取命令的返回欄位中,會將下次建議拉取Broker返回給客戶端,根據其值從指定的broker拉取。

訊息拉取實現PullAPIWrapper在處理拉取結果時會將服務端建議的brokerId更新到broker拉取快取表中。

在發起拉取請求之前,首先根據如下程式碼,選擇待拉取訊息的Broker。

3.2 訊息消費進度同步機制

從上面內容可知,主從同步引入的主要目的就是訊息堆積的內容預設超過實體記憶體的40%,則訊息讀取則由從伺服器來接管,實現訊息的讀寫分離,避免主服務IO抖動嚴重。那問題來了,主伺服器宕機後,從伺服器接管訊息消費後,那訊息消費進度儲存在哪裡?當主伺服器恢復正常後,訊息是從主伺服器拉取還是從從伺服器拉取?主伺服器如何得知最新的訊息消費進度呢?

RocketMQ訊息消費進度管理(叢集模式):
叢集模式下訊息消費進度儲存檔案位於服務端${ROCKETMQ_HOME}/store/config/consumerOffset.json。訊息消費者從伺服器拉取一批訊息後提交到消費組特定的執行緒池中處理訊息,當訊息消費成功後會向Broker傳送ACK訊息,告知消費端已成功消費到哪條訊息,Broker收到訊息消費進度反饋後,首先儲存在記憶體中,然後定時持久化到consumeOffset.json檔案中。備註:關於訊息消費進度管理更多的實現細節,建議查閱筆者所著的《RocketMQ技術內幕》。

我們先看一下客戶端向服務端反饋訊息消費進度時如何選擇Broker。
因為主服務的brokerId為0,預設情況下當主伺服器存活的時候,優先會選擇主伺服器,只有當主伺服器宕機的情況下,才會選擇從伺服器。

既然叢集模式下訊息消費進度儲存在Broker端,當主伺服器正常時,訊息消費進度檔案儲存在主伺服器,那提出如下兩個問題:
1)訊息消費端在主伺服器存活的情況下,會優先向主伺服器反饋訊息消費進度,那從伺服器是如何同步訊息消費進度的。
2)當主伺服器宕機後則訊息消費端會向從伺服器反饋訊息消費進度,此時訊息消費進度如何儲存,當主伺服器恢復正常後,主伺服器如何得知最新的訊息消費進度。

為了解開上述兩個疑問,我們優先來看一下Broker伺服器在收到提交訊息消費進度反饋命令後的處理邏輯:

客戶端定時向Broker端傳送更新訊息消費進度的請求,其入口為:RemoteBrokerOffsetStore#updateConsumeOffsetToBroker,該方法中一個非常關鍵的點是:選擇broker的邏輯,如下所示:

如果主伺服器存活,則選擇主伺服器,如果主伺服器宕機,則選擇從伺服器。也就是說,不管訊息是從主伺服器拉取的還是從從伺服器拉取的,提交訊息消費進度請求,優先選擇主伺服器。服務端就是接收其偏移量,更新到服務端的記憶體中,然後定時持久化到${ROCKETMQ_HOME}/store/config/consumerOffset.json。

經過上面的分析,我們來討論一下這個場景:
訊息消費者首先從主伺服器拉取訊息,並向其提交訊息消費進度,如果當主伺服器宕機後,從伺服器會接管訊息拉取服務,此時訊息消費進度儲存在從伺服器,主從伺服器的訊息消費進度會出現不一致?那當主伺服器恢復正常後,兩者之間的訊息消費進度如何同步?

3.2.1 從服務定時同步主伺服器進度


如果Broker角色為從伺服器,會通過定時任務呼叫syncAll,從主伺服器定時同步topic路由資訊、訊息消費進度、延遲佇列處理進度、消費組訂閱資訊。

那問題來了,如果主伺服器啟動後,從伺服器馬上從主伺服器同步訊息訊息進度,那豈不是又要重新消費?

其實在絕大部分情況下,就算從服務從主伺服器同步了很久之前的消費進度,只要訊息者沒有重新啟動,就不需要重新消費,在這種情況下,RocketMQ提供了兩種機制來確保不丟失訊息消費進度。

第一種,訊息消費者在記憶體中存在最新的訊息消費進度,繼續以該進度去伺服器拉取訊息後,訊息處理完後,會定時向Broker伺服器反饋訊息消費進度,在上面也提到過,在反饋訊息消費進度時,會優先選擇主伺服器,此時主伺服器的訊息消費進度就立馬更新了,從伺服器此時只需定時同步主伺服器的訊息消費進度即可。

第二種是,訊息消費者在向主伺服器拉取訊息時,如果是是主伺服器,在處理訊息拉取時,也會更新訊息消費進度。

3.2.2 主伺服器訊息拉取時更新訊息消費進度

主伺服器在處理訊息拉取命令時,會觸發訊息消費進度的更新,其程式碼入口為:PullMessageProcessor#processRequest

boolean storeOffsetEnable = brokerAllowSuspend;  // @1
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; 
storeOffsetEnable = storeOffsetEnable
            && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;  // @2
if (storeOffsetEnable) {
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
 }

程式碼@1:首先介紹幾個區域性變數的含義:

  • brokerAllowSuspend:broker是否允許掛起,在訊息拉取時,該值預設為true。
  • hasCommitOffsetFlag:訊息消費者在記憶體中是否快取了訊息消費進度,如果快取了,該標記設定為true。
    如果Broker的角色為主伺服器,並且上面兩個變數都為true,則首先使用commitOffset更新訊息消費進度。

看到這裡,主從同步訊息消費進度的相關問題,應該就有了答案了。

4、總結

上述實現原理的講解有點枯燥無味,我們先來回答如下幾個問題:

1、主,從伺服器都在執行過程中,訊息消費者是從主拉取訊息還是從從拉取?
答:預設情況下,RocketMQ訊息消費者從主伺服器拉取,當主伺服器積壓的訊息超過了實體記憶體的40%,則建議從從伺服器拉取。但如果slaveReadEnable為false,表示從伺服器不可讀,從伺服器也不會接管訊息拉取。

2、當訊息消費者向從伺服器拉取訊息後,會一直從從伺服器拉取?
答:不是的。分如下情況:
1)如果從伺服器的slaveReadEnable設定為false,則下次拉取,從主伺服器拉取。
2)如果從伺服器允許讀取並且從伺服器積壓的訊息未超過其實體記憶體的40%,下次拉取使用的Broker為訂閱組的brokerId指定的Broker伺服器,該值預設為0,代表主伺服器。
3)如果從伺服器允許讀取並且從伺服器積壓的訊息超過了其實體記憶體的40%,下次拉取使用的Broker為訂閱組的whichBrokerWhenConsumeSlowly指定的Broker伺服器,該值預設為1,代表從伺服器。

3、主從服務訊息消費進是如何同步的?
答:訊息消費進度的同步時單向的,從伺服器開啟一個定時任務,定時從主伺服器同步訊息消費進度;無論訊息消費者是從主伺服器拉的訊息還是從從伺服器拉取的訊息,在向Broker反饋訊息消費進度時,優先向主伺服器彙報;訊息消費者向主伺服器拉取訊息時,如果訊息消費者記憶體中存在訊息消費進度時,主會嘗試跟新訊息消費進度。

讀寫分離的正確使用姿勢:
1、主從Broker伺服器的slaveReadEnable設定為true。
2、通過updateSubGroup命令更新訊息組whichBrokerWhenConsumeSlowly、brokerId,特別是其brokerId不要設定為0,不然從從伺服器拉取一次後,下一次拉取就會從主去拉取。


作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社群佈道師,公眾號:中介軟體興趣圈 維護者,目前已陸續發表原始碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等原始碼專欄。

相關推薦

RocketMQ 主從同步若干問題答疑

目錄 1、初識主從同步 2、提出問題 3、原理探究 3.1 RocketMQ主從讀寫分離機制 3.2 訊息消費進度同步機制 4、總結

RocketMQ 主從同步機制

主從同步(HA 高可用)   主從同步原理:   為了保證系統的高可用,訊息到達主伺服器後,需要將訊息同步到從伺服器。如果主伺服器宕機,消費者可用從從伺服器拉取訊息。   大體步驟:     1、主伺服器啟動,監聽從伺服器的連結。     2、從伺服器主動連結主伺服器,建立TCP相關連結

05: 實時增量備份 、 XtraBackup 備份 、 總結和答疑 、 MySQL 主從同步

lte per xtra 使用 pos posit 完全備份 一次 信息 day05 增量備份一、啟用binlog日誌 實現 實時增量備份二、使用第3方軟件提供的命令做增量備份 +++++++++++++++++++++++++++++++++一、啟用binlog日誌 實

配置DNS的正反向解析與主從同步

dns配置DNS的正反向解析與主從同步準備:本實驗基於兩臺centos6.5其內核版本號為2.6.32-431.el6.x86_64配置時間同步# echo "#update system date by jiajie at 20170506" >>/var/spool/cron/root #

mysql主從同步配置

這就是 sla star 失敗 有效 bin roo orm 相同 文件同步 rsync同步http://www.cnblogs.com/itech/archive/2009/08/10/1542945.html 在當前的生產工作中,大多數應用的mysql主從同步都是異步的

MySQL主從同步是怎樣實現的?

基本原理從庫生成兩個線程,一個I/O線程,一個SQL線程; i/o線程去請求主庫 的binlog,並將得到的binlog日誌寫到relay log(中繼日誌) 文件中; 主庫會生成一個 log dump 線程,用來給從庫 i/o線程傳binlog; SQL 線程,會讀取relay log文件中的日誌,

主從同步出現一下錯誤:Slave_IO_Running: Connecting

主從同步出現一下錯誤:slave_io_running: connecting主從同步出現一下錯誤: Slave_IO_Running: Connecting Slave_SQL_Running: Yes 解決方法: 導致lave_IO_Running 為connecting 的原因主要有

MySQL主從同步報錯故障處理記錄

記錄 error start record master 前言在發生故障切換後,經常遇到的問題就是同步報錯,下面是最近收集的報錯信息。記錄刪除失敗在master上刪除一條記錄,而slave上找不到Last_SQL_Error: Could not execute Delete_rows e

mysql主從同步延遲原因及解決方法

解決方案 數據庫 master 朋友 mysql MySQL主從延遲原因以及解決方案:談到MySQL數據庫主從同步延遲原理,得從mysql的數據庫主從復制原理說起,mysql的主從復制都是單線程的操作(mysql5.6版本之前),主庫對所有DDL和DML產生binlog,binlog是順序

監控mysql主從同步狀態是否異常,如果異常,則發生短信或郵寄給管理員

監控mysql主從同步狀態是否異常階段1:開發一個守護進程腳本每30秒實現檢測一次。階段2:如果同步出現如下錯誤號(1158,1159,1008,1007,1062),請跳過錯誤階段3:請使用數組技術實現上述腳本(獲取主從判斷及錯誤號部分)[[email protected]/* */ ~]# m

Centos下高可用主從同步DNS服務部署

高可用 dns 一、背景介紹 在日常工作中,為解決內網域名解析問題,時長會配置DNS服務來提供解析。這時DNS服務就起到了為所有內部服務提供連通的基礎,變得非常重要了。所以在服務啟動後還是應該考慮服務的高可用和數據的完整性。 網友有很多LVS+Keepalived+Bind的負載均衡高可用

MySQL主從同步

bsp active 是否 rec currency sam query stack ace 1.MDB [client]port = 3306socket = /usr/local/mysql/mysql.sock [mysqld]

mysql主從同步監控腳本

mysql主從復制監控腳本 linux mysql shell mysql主從同步監控腳本,利用mysql從庫中的IO和SQL進程以及延遲時間來監控主從同步是否正常,詳細shell腳本如下:#!/bin/bash #author wangning #date 2017-7-17 #qq 119

【故障】MySQL主從同步故障-Slave_SQL_Running: No

ack counter stop usr mysql-bin back data 連接 xid 轉自:http://www.linuxidc.com/Linux/2014-02/96945.htm 故障現象:進入slave服務器,運行:mysql> show slav

ubuntu16配置mysql5.7主從同步

mysqld 設置 update 三臺 host 測試 start mysql sha 測試環境如下:   master: 10.0.0.26   slave01: 10.0.0.27   slave02: 10.0.0.28 一、三臺機均安裝mysql-server5

MySQL 主從同步中斷常見問題

event config update ren ons lec myisam cor rac Error_code: 1032 【現象】   Last_Error: Could not execute Update_rows event on table kebao.t1;

redis配置文件詳解及實現主從同步切換

redis redis主從 redis配置文件詳解及實現主從同步切換redis復制Redis復制很簡單易用,它通過配置允許slave Redis Servers或者Master Servers的復制品。接下來有幾個關於redis復制的非常重要特性:一個Master可以有多個Slaves。Slaves能

mysql 主從同步實驗細解

mysql master slavemysql 主從同步實驗細解一.實驗環境實驗環境 192.168.9.108 為master 192.168.9.109 為slave數據庫版本:version 5.1.73安裝方式:采用的yum 安裝 源為163的源系

劍指架構師系列-MySQL的安裝及主從同步

use 回車 ant arc 使用 ttl stat 解鎖 reload 1、安裝數據庫 wget http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm rpm -ivh mysql-com

+++++++DNS基本工作原理、DNS正反向解析及主從同步

博客 運維 linux dns基礎工作原理bind Berkerley Information Name DomainDNS Domain Name ServerTCP/UDP 53UDP 53 無連接協議,域名解析TCP 53 面向連接協議,區域傳送歷史IANA統一名字,自己hosts中維護(%