rocketmq 消費者負載均衡-分散式下的messageQueue的分配和鎖定
思考一個問題,分散式環境,網路不穩定.
a臺broker上有messageQueue,b臺queue.
a,b都會隨著網路不穩定而變動,你如何分配,並且保證messageQueue都被分配出去.
rocketMq實現: 客戶端實現負載均衡,每個人的演算法都一樣. (源自於大家都從zk上獲取, topic總messageQueue和consumers )
然後平均分配. 有可能因為某些原因,得到的資料不一致,會發生併發搶佔.這點對於非orderConsumer來說, 不鎖., 對於orderConsumer來說,要鎖,對於通過broker的鎖控制.
併發消費:
DefaultMQPushConsumerImpl.java{
pullMessage(PullRequest){
if (processQueue.isDropped()) {
log.info("the pull request[{}] is droped.", pullRequest.toString());
return;
}
}
理論上 pullConsumer也要有這個判斷.
orderlyService和concurrentlyService也是有呼叫這個processQueue.isDropped()
對於orderlyConsumer來說,還多了一個併發鎖,即
public static final int LOCK_BATCH_MQ = 41; // Broker Consumer向Master解鎖佇列 public static final int UNLOCK_BATCH_MQ = 42;
兩點使用:
1.執行某個quque時嘗試鎖該queue,但是利用了本地快取鎖processQueue,(非同步呼叫遠端broker更新本地鎖,避免了大量呼叫等待.) 修改消費者的lprocessQueue. Locked
2.定時任務獲取所有當前queue的鎖.(本地processQueue時由reBalance分配的)
orderService執行consumeRequest時進行鎖判斷,processQueue.isLocked() 鎖失敗無法進行消費.
參考文獻: http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142693
我的實現:
非順序執行缺少queue的加鎖, broker獲取所有的該queue對應的topic的消費者(從zk上獲取),然後通知queue來獲取資料或者推送.同時保證queue被一個人消費.加鎖.