RocketMQ消費者設定了instanceName屬性後訊息竟不翼而飛
背景
RocketMQ使用過程中為了快速搭建消費服務,於是在同一個機器叢集消費的方式起了多個消費者例項,結果發現部分訊息沒被消費到!本文是對問題產生原因的跟蹤和分析,下面會將專案中遇到的問題簡化成官方demo來說明。
問題重現
生產者程式碼
Producer.java
/* * Instantiate with a producer group name. * 預設分配4個訊息佇列 */ DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("localhost:9876"); /* * Launch the instance. */ producer.start(); for (int i = 0; i < 10; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg= new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); }catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } }
啟動一個producer例項傳送10條訊息到4個訊息佇列。
訊息傳送情況:
訊息傳送結果: queueId=2,訊息內容: Hello RocketMQ 0 訊息傳送結果: queueId=3,訊息內容: Hello RocketMQ 1 訊息傳送結果: queueId=0,訊息內容: Hello RocketMQ 2 訊息傳送結果: queueId=1,訊息內容: Hello RocketMQ 3 訊息傳送結果: queueId=2,訊息內容: Hello RocketMQ 4 訊息傳送結果: queueId=3,訊息內容: Hello RocketMQ 5 訊息傳送結果: queueId=0,訊息內容: Hello RocketMQ 6 訊息傳送結果: queueId=1,訊息內容: Hello RocketMQ 7 訊息傳送結果: queueId=2,訊息內容: Hello RocketMQ 8 訊息傳送結果: queueId=3,訊息內容: Hello RocketMQ 9
從傳送結果可以看出訊息傳送的佇列分配情況如下所示:
消費者程式碼
Consumer.java
/* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("localhost:9876"); //自定義instanceName consumer.setInstanceName("XUJIAN_MACBOOK"); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); System.out.printf("msgBody: %s %n",new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start();
本地啟動兩個消費者例項,即consumer啟動兩次,設定為叢集消費模式,且兩個消費者例項屬於同一個消費者組。
紊亂的消費結果
consumer1
ConsumeMessageThread_11 接收到新訊息: queueId=0,訊息內容: Hello RocketMQ 2 ConsumeMessageThread_14 接收到新訊息: queueId=1,訊息內容: Hello RocketMQ 6 ConsumeMessageThread_15 接收到新訊息: queueId=0,訊息內容: Hello RocketMQ 3 ConsumeMessageThread_16 接收到新訊息: queueId=1,訊息內容: Hello RocketMQ 7
consumer2
ConsumeMessageThread_6 接收到新訊息: queueId=0,訊息內容: Hello RocketMQ 2 ConsumeMessageThread_7 接收到新訊息: queueId=1,訊息內容: Hello RocketMQ 6 ConsumeMessageThread_8 接收到新訊息: queueId=0,訊息內容: Hello RocketMQ 3 ConsumeMessageThread_9 接收到新訊息: queueId=1,訊息內容: Hello RocketMQ 7
從消費結果可以看出訊息消費的佇列分配情況如下所示:
兩個消費者消費了相同佇列的相同訊息,且部分訊息沒被消費到。這和預期的叢集消費模式下消費者組內的消費者均分訊息佇列不符!
原因分析
當發現消費者消費異常時,首先應該排查消費負載均衡是否正常。
消費負載均衡
叢集消費的時候會根據統一消費者組內消費者的數量、佇列數量以及不同的策略來為每個消費者分配要消費的訊息。
消費者的預設佇列分配策略是“均分”,原始碼如下:
/** * Constructor specifying consumer group. * * @param consumerGroup Consumer group. */ public DefaultMQPushConsumer(final String consumerGroup) { this(null, consumerGroup, null, new AllocateMessageQueueAveragely()); }
其中AllocateMessageQueueAveragely
就是平均分配策略,其他的還有隨機等,均實現了AllocateMessageQueueStrategy
介面。
RebalanceImpl.java
該類就是訊息消費均衡類。
相關核心原始碼如下:
public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { //根據topic進行reblance this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); } private void rebalanceByTopic(final String topic, final boolean isOrder) { ... //獲取分配的結果 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); ... }
又回到AllocateMessageQueueAveragely.java
,上文提到這個類的策略是均分,那就來看看他是怎麼做的。原始碼如下:AllocateMessageQueueAveragely.java
public List<MessageQueue> allocate(String consumerGroup/*消費者組*/, String currentCID/*clientId*/, List<MessageQueue> mqAll/*訊息佇列集合*/, List<String> cidAll/*消費者組裡面的所有消費者的clientId*/) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); //如果消費者組裡的消費者不包含當前這個消費者,直接返回 if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } //當前消費者在消費者集合裡面的位置 int index = cidAll.indexOf(currentCID); //佇列數對消費者數取模 int mod = mqAll.size() % cidAll.size(); //求當前消費者應該消費幾個佇列 int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); //求當前消費者應該從哪個佇列開始消費訊息 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); //將當前消費者應該消費的佇列一個一個放進返回結果列表 for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }
可以發現消費者消費哪些佇列是由
clientId
決定的。
所以當兩個消費者的clientId一樣時,呼叫indexOf
方法返回的是一樣的結果,所以他們消費的佇列是一樣的。如上面的例子,總共有4個佇列,2個消費者,所以兩個消費者只消費了同樣的兩個佇列:queueId=0、queueId=1
。
clientId怎麼生成
上面說了消費佇列負載均衡的結果和clientId
有關,那clientId
是怎麼生成的?
構建clientId
的原始碼如下:
/** * clientId格式:ip+@+instanceName[+@unitName],通常你會看到形如127.0.0.1@32531這樣的clientId * @return */ public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
clientId用來唯一標識一個
MQClientInstance
。
可見clientId
是根據instanceName
屬性、ip
、unitName
(可選)生成的。
為什麼會生成相同的clientId
根據上面clientId
的生成規則,兩個消費者都在本地啟動,意味著有相同的ip
,unitName
沒有設定。
正巧兩個消費者設定了相同的instanceName
,那生成的clientId
必然相同!,這就是問題的關鍵所在。
解決方案
經過上面分析知道了是clientId
相同是問題所在,那解決方案就是讓兩個消費者的clientId
不相同。
根據
那最簡單的解決方案有如下三種:
方案一:不設定instanceName屬性
因為叢集模式下instanceName
預設值為程序id,原始碼如下:
/** * 如果是叢集消費模式,如果instanceName是預設值(即沒有自定義該屬性)則通過程序id來替換該屬性 */ public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } }
兩個消費者的程序id肯定是不同的。
方案二:兩個消費者設定不同的instanceName屬性
這個很容易能想到,不必多說。
方案三:兩個消費者在不同的機器上啟動
在不同機器上啟動意味著ip
是不一樣的,也可以使生成的clientId
不同。正常的消費結果
通過上述解決方案,最終得到了正確的消費結果。
consumer1:
ConsumeMessageThread_16 接收到新訊息: queueId=0,訊息內容: Hello RocketMQ 2 ConsumeMessageThread_17 接收到新訊息: queueId=1,訊息內容: Hello RocketMQ 3 ConsumeMessageThread_18 接收到新訊息: queueId=0,訊息內容: Hello RocketMQ 6 ConsumeMessageThread_19 接收到新訊息: queueId=1,訊息內容: Hello RocketMQ 7
consumer2:
ConsumeMessageThread_6 接收到新訊息: queueId=2,訊息內容: Hello RocketMQ 0 ConsumeMessageThread_7 接收到新訊息: queueId=3,訊息內容: Hello RocketMQ 1 ConsumeMessageThread_8 接收到新訊息: queueId=2,訊息內容: Hello RocketMQ 4 ConsumeMessageThread_9 接收到新訊息: queueId=3,訊息內容: Hello RocketMQ 5 ConsumeMessageThread_10 接收到新訊息: queueId=2,訊息內容: Hello RocketMQ 8 ConsumeMessageThread_11 接收到新訊息: queueId=3,訊息內容: Hello RocketMQ 9
10條訊息被兩個消費者消費完成,從消費結果可以看出訊息消費的佇列分配情況如下所示:
佇列被兩個消費者平均分配,但是注意,佇列均分不代表訊息均分!
總結
通過這次的問題跟蹤排查和解決,越來越意識到對一箇中間件原理甚至原始碼熟悉的重要性。當了解了其整體架構、運作原理以及模組原始碼以後就能夠很快判斷出大概是哪裡出了問題,這最終也會沉澱為我們的個人經驗。
轉載:https://blog.csdn.net/qq_18515155/article/details/106175901
TRANSLATE with x English TRANSLATE with EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back 此頁面的語言為英語 翻譯為
- 中文(簡體)
- 中文(繁體)
- 丹麥語
- 烏克蘭語
- 烏爾都語
- 亞美尼亞語
- 俄語
- 保加利亞語
- 克羅埃西亞語
- 冰島語
- 加泰羅尼亞語
- 匈牙利語
- 卡納達語
- 印地語
- 印尼語
- 古吉拉特語
- 哈薩克語
- 土耳其語
- 威爾士語
- 孟加拉語
- 尼泊爾語
- 布林語(南非荷蘭語)
- 希伯來語
- 希臘語
- 庫爾德語
- 德語
- 義大利語
- 拉脫維亞語
- 挪威語
- 捷克語
- 斯洛伐克語
- 斯洛維尼亞語
- 旁遮普語
- 日語
- 普什圖語
- 毛利語
- 法語
- 波蘭語
- 波斯語
- 泰盧固語
- 泰米爾語
- 泰語
- 海地克里奧爾語
- 愛沙尼亞語
- 瑞典語
- 立陶宛語
- 緬甸語
- 羅馬尼亞語
- 寮國語
- 芬蘭語
- 英語
- 荷蘭語
- 薩摩亞語
- 葡萄牙語
- 西班牙語
- 越南語
- 亞塞拜然語
- 阿姆哈拉語
- 阿爾巴尼亞語
- 阿拉伯語
- 韓語
- 馬爾加什語
- 馬拉地語
- 馬拉雅拉姆語
- 馬來語
- 馬耳他語
- 高棉語