1. 程式人生 > >Rocketmq 4.3.2訊息傳送邏輯--------sendDefaultImpl方法研究

Rocketmq 4.3.2訊息傳送邏輯--------sendDefaultImpl方法研究

訊息傳送邏輯

makeSureStateOK():判斷服務(serviceState)是否可用,不可用就直接退出
checkMessage: 判斷訊息是否符合要求:是否為空,topic(還要判斷topic是否符合命名規則),body是否為空,訊息長度是否為0或者大於預設訊息長度。
在這裡插入圖片描述
beginTimestampFirst記錄當前時間戳,表明timeout從此刻開始計時。
tryToFindTopicPublishInfo:獲取topic路由資訊
在這裡插入圖片描述
進入tryToFindTopicPublishInfo()方法:
如果快取中topic不存在或者沒有可用的topic,那就去nameserver中去找topic。

在這裡插入圖片描述
進入updateTopicRouteInfoFromNameServer,嘗試鎖住nameserver,若未獲得鎖,等待3000毫秒後依舊沒有獲得鎖就返回false,獲得鎖後,就開始在nameserver上找topic。
在這裡插入圖片描述
如果滿足傳進來的引數isDefault為true以及defaultMQProducer不為空,那麼就用rocketmq預設的一個topic: TBW102的TopicRouteData變數來構造TopicPublishInfo物件
在這裡插入圖片描述
如果不滿足的話就根據傳進來的topic引數到nameserver裡去查這個topic的資訊。
進入getTopicRouteInfoFromNameServer方法中:設定請求頭中的Topic引數後,傳送獲取Topic路由資訊的request請求給NameServer (invokeSync???對rocketmq的rpc通訊框架還沒了解,這裡就不深入了)假如獲取Topic存在,則成功返回,利使用TopicRouteData進行解碼,且直接返回
在這裡插入圖片描述

返回的TopicRouteData不為空的話,進入topicRouteDataIsChange方法,判斷新獲得的topicRouteData與本地的老的topicRouteData是否發生變化。
在這裡插入圖片描述
如果變化則克隆,在brokerAddrTable中放置新的topicRouteData裡記載的brokername和對應的brokeraddr。
在這裡插入圖片描述
將TopicRouteData轉換至TopicPublishInfo路由資訊以及TopicSubscribeInfo路由資訊
在這裡插入圖片描述

在這裡插入圖片描述
(此圖來源部落格:https://www.songma.com/news/txtlist_i4045v.html)
在這裡插入圖片描述
進入selectOneMessageQueue()方法:


情況一:傳送訊息推遲容錯開關開啟
重點:重試時,優先選擇的broker是一個可用的而且等於lastBrokerName的broker
在這裡插入圖片描述
進入getSendWhichQueue().getAndIncrement()方法,可能看到index的產生機制:
在這裡插入圖片描述
所以整個過程大致如下:選擇一個queue來發訊息,如果之前沒有傳送過訊息,隨機選擇一個,如果發過訊息,選擇上一次傳送記錄的queue的下一個queue。判斷所選擇的的mq的broker,如果可用isAvailable,則看跟lastBrokerName(訊息重發時就會有lastBrokerName的記錄)是否一致或者lastBrokerName是否為null,如果滿足條件,就選擇這個mq,如果這個queue不符合條件,就再判斷下下一個queue,直到遍歷該topic的所有queue。
如果找不到符合上面要求的broker,那麼就那麼就放棄broker==lastBrokerName這個條件, 選擇一個相對較好的broker來發送pickOneAtLeast()
在這裡插入圖片描述
在這裡插入圖片描述
找到可以用來發送的mq後,呼叫sendKernelImpl開始傳送訊息。
在這裡插入圖片描述
非同步和ONEWAY模式直接返回,同步的話還得判斷訊息是否傳送成功,傳送失敗後得重新發送。
在這裡插入圖片描述

傳送重試

這裡為了更好的說明訊息傳送重試的機制,我們先來看一下MQFaultStrategy這個類。
我們需要來了解下LatencyFaultToleranceImpl這個類,以及它的一些相關方法。
在這裡插入圖片描述
在這裡插入圖片描述LatencyFaultToleranceImpl
首先我們可以看到LatencyFaultToleranceImpl接口裡有一個叫faultItemTable的hashmap資料結構,儲存著broker和它對應的一些資訊,我們先進入FaultItem看看都是哪些資訊。
在這裡插入圖片描述
結合函式updateFaultItem,我們可以瞭解到FaultItem的三個屬性,分別是brokername,延遲容忍時長,以及broker不可用時長結束的時刻。
在這裡插入圖片描述
在這裡插入圖片描述
即可知faultItemTable這個hashmap中,如果元素FaultItem不為空,則表明記錄過延遲容錯的相關資訊,為null則表明該broker還沒有出錯,即可用。這也是判斷broker是否可用的方法isAvailable
在這裡插入圖片描述
當FaultItem != null時,還需要判斷不可用時長是否已經過了,即當前時刻是否大於startTimestamp,如果滿足條件,也可認為該broker是可用的。
在這裡插入圖片描述
接下來我們來看一下發送重試時找不是最好的broker的那個方法pickOneAtLeast。
在這裡插入圖片描述
可以看出,pickOneAtLeast()的實現機制是:將faultItemTable裡記錄的所有broker打亂隨機排序後再按一定規則進行排序,每次隨機找broker的話是在排序後的broker list 的前一半broker中找。
排序規則如何呢,我們可以看到FaultItem實現Comparable介面,Comparable介面中有一個int compareTo(T o)方法,呼叫這個方法的物件將會與引數o進行比較,小於o、等於o和大於o分別對應的返回值為負數、0和正數。
我們看一下FaultItem裡的compareTo(final FaultItem other)方法:
在這裡插入圖片描述
可以看出排序規則如下:可用的broker排在不可用的broker前面,延遲容忍時間長的排在延遲容忍時間短的broker前面,不可用時長短的排在不可用時長長的broker前面。