實驗(一)
實驗現象
準備
建立的topicA、topicB,每個都是16個訊息佇列;
建立了同一個消費者組:ntm-hxy-group
預設訊息模式:負載均衡
消費者一(clientId1)
先啟動一個消費者一:clientId:192.168.31.182@10962#386194072446460
public static void main(String[] args) throws Exception { String nameAddress = "192.168.11.180:9876;192.168.11.181:9876;192.168.11.182:9876"; DefaultMQPushConsumer concurrentlyPushConsumer2= createConcurrentlyPushConsumer("ntm-hxy-group", nameAddress, "topicA", "topicB"); }
這個clientId1 訂閱了topicA、topicB,屬於組ntm-hxy-group
控制檯介面:這裡代表訂閱的clientId的數量
上圖訂閱關係:client1訂閱了兩個topic,自動添加了一個重試的topic
該消費者組ntm-hxy-group消費佇列的情況如下:
這是重試topic,只有一個訊息佇列,由clientId1消費
topicA所有16個佇列,都由clientId消費
topicB 所有16個訊息佇列都由clientId1 消費
由此可見:兩個topic都被這一個clientId1佔用了,還有一個重試topic也是被該clinetId1佔用
再啟動一個消費者(clientId2)
在啟動一個clientId2,同一個group,只訂閱topicA。clientId:192.168.31.182@11084#386532403737756
控制檯介面:
同一個消費者組group,clientId變為了兩個
topic只有topicA了,也就是最新訂閱資訊,但是clientId還有兩個。
此時,該消費者組ntm-hxy-group消費佇列的情況如下:
如上圖,重試訊息佇列消費物件沒有變,因為重試topic是服務端(broker)基於group產生的,所以一直和第一個啟動的clientId繫結的,也就是重試佇列會一直由第一個clientId消費,不會由於心跳引起訂閱的改變而改變。
也就是每個clientId可以消費topicA一半的訊息,但是此時topicB丟失了,也就是傳送給topicB的訊息會被堆積,不能被消費了。
下次clientId1傳送心跳時,topicB又訂閱上了,此時的情況如下:
這個重試group跟心跳無關,還是clientId1消費沒變。
原因:雖然client1傳送心跳時,訂閱資訊重新整理了,topicA和topicB都訂閱了,但是消費佇列繫結關係是根據group來進行替換的,因為負載均衡時關鍵兩點:
findConsumerIdList 方法接受兩個引數:Topic 主題和 ConsumerGroup 消費組
拿mqSet、cidAll進行預設的平均負載均衡策略進行負載均衡時,邏輯如下:
這個方法預設負載均衡策略具體實現如下:
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }
TRANSLATE with x English TRANSLATE with EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back 此頁面的語言為中文(簡體) 翻譯為
- 中文(簡體)
- 中文(繁體)
- 丹麥語
- 烏克蘭語
- 烏爾都語
- 亞美尼亞語
- 俄語
- 保加利亞語
- 克羅埃西亞語
- 冰島語
- 加泰羅尼亞語
- 匈牙利語
- 卡納達語
- 印地語
- 印尼語
- 古吉拉特語
- 哈薩克語
- 土耳其語
- 威爾士語
- 孟加拉語
- 尼泊爾語
- 布林語(南非荷蘭語)
- 希伯來語
- 希臘語
- 庫爾德語
- 德語
- 義大利語
- 拉脫維亞語
- 挪威語
- 捷克語
- 斯洛伐克語
- 斯洛維尼亞語
- 旁遮普語
- 日語
- 普什圖語
- 毛利語
- 法語
- 波蘭語
- 波斯語
- 泰盧固語
- 泰米爾語
- 泰語
- 海地克里奧爾語
- 愛沙尼亞語
- 瑞典語
- 立陶宛語
- 緬甸語
- 羅馬尼亞語
- 寮國語
- 芬蘭語
- 英語
- 荷蘭語
- 薩摩亞語
- 葡萄牙語
- 西班牙語
- 越南語
- 亞塞拜然語
- 阿姆哈拉語
- 阿爾巴尼亞語
- 阿拉伯語
- 韓語
- 馬爾加什語
- 馬拉地語
- 馬拉雅拉姆語
- 馬來語
- 馬耳他語
- 高棉語