1. 程式人生 > >rocketmq 問題彙總-一個consumerGroup只對應一個topic

rocketmq 問題彙總-一個consumerGroup只對應一個topic

轉載 https://blog.csdn.net/a417930422/article/details/50663639

1 同一個訂閱組內不同Consumer例項訂閱不同topic消費混亂問題調查

圖1:

背景說明:

如圖1左半部分,假設目前的關係如下:

broker: 兩個,broker_a和broker_b

topic:兩個,topic1和topic2,每個topic在每個broker上分為4個queue

consumer:兩個,consumer1和consumer2,都屬於group1,分屬於不同的jvm執行。

預設情況下,topic和queue的對應關係是:

topic1 <-> broker_a q0~q3,

topic1 <-> broker_b q0~q3,

topic2 <-> broker_a q0~q3,

topic2 <-> broker_b q0~q3

rebalance流程開始:

假設consumer1先啟動,consumer1最終通過rebalance對應關係如下:

topic1 <-> broker_a q0~q3,

topic1 <-> broker_b q0~q3

 

接著consumer2啟動,consumer2具體rebalance流程如下:

關鍵點在5.2,會把consumer1也抓下來,接著根據分配策略會導致consumer2只消費broker_b上topic2對應的q0~q3。

同樣,consumer1也會進行rebalance,進而使其只消費broker_a的topic1對應的q0~q3,最終導致其關係變為圖1中右圖所示。

consumer端警告日誌:

rebalance完成之後,consumer端間斷列印如下異常:

14:22:04.005 [NettyClientPublicExecutor_3] WARN RocketmqClient - execute the pull request exception
com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's subscription not exist
See 

https://github.com/alibaba/RocketMQ/issues/46 for further details.
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:500) ~[rocketmq-client-3.2.6.jar:na]
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:78) ~[rocketmq-client-3.2.6.jar:na]
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:455) ~[rocketmq-client-3.2.6.jar:na]
at com.alibaba.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:62) [rocketmq-remoting-3.2.6.jar:na]
at com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:262) [rocketmq-remoting-3.2.6.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_51]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

broker端也發現相應日誌:

2015-07-31 16:38:08 WARN ClientManageThread_4 - subscription changed, group: consumerTestGroup remove topic vrs-topic-test SubscriptionData [classFilterMode=false, topic=vrs-topic-test, subString=*, tagsSet=[], codeSet=[], subVersion=1438331853269]

2015-07-31 16:38:22 WARN PullMessageThread_29 - the consumer's subscription not exist, group: consumerTestGroup

consumer&broker異常日誌原因:

consumer會定時與所有broker進行心跳通訊,程式碼詳見:MQClientInstance.startScheduledTask,預設每30秒心跳一次。

心跳主要作用:

會將HeartbeatData物件傳送到broker端,攜帶consumer group和topic資訊

對應到圖1中,consumer1會發送類似group1,topic1

consumer2會發送group1,topic2

 

經過走查broker端程式碼發現如下程式碼:

重點在步驟4和5.1和5.2,現在只針對一個broker做一下分析:

假設consumer1先啟動,對於broker_a一開始關係是group1->topic1

當consumer2啟動並rebalance完成後,consumer2傳送group1->topic2,

在步驟4,會根據group1將原先的group1->topic1取出。

在步驟5.1,新增topic2

在步驟5.2,移除topic1。

 

而consumer1在rebalance之後同樣會進行如上步驟,導致topic1&topic2反覆被remove掉,

最終導致了consumer端和broker端的異常日誌不停列印。

 

最終結論:是rebalance導致consumer只消費一部分topic,但是顯然rocketmq在broker端做了處理,從而不停列印警告資訊。