1. 程式人生 > >rocketmq 消費者負載均衡-分散式下的messageQueue的分配和鎖定

rocketmq 消費者負載均衡-分散式下的messageQueue的分配和鎖定

思考一個問題,分散式環境,網路不穩定.

a臺broker上有messageQueue,b臺queue.

a,b都會隨著網路不穩定而變動,你如何分配,並且保證messageQueue都被分配出去.

rocketMq實現: 客戶端實現負載均衡,每個人的演算法都一樣. (源自於大家都從zk上獲取, topic總messageQueue和consumers )

然後平均分配. 有可能因為某些原因,得到的資料不一致,會發生併發搶佔.這點對於非orderConsumer來說, 不鎖., 對於orderConsumer來說,要鎖,對於通過broker的鎖控制.

併發消費:

DefaultMQPushConsumerImpl.java{

pullMessage(PullRequest){

    if (processQueue.isDropped()) {
            log.info("the pull request[{}] is droped.", pullRequest.toString());
            return;
     }

}

理論上 pullConsumer也要有這個判斷.

orderlyService和concurrentlyService也是有呼叫這個processQueue.isDropped()

對於orderlyConsumer來說,還多了一個併發鎖,即

public static final int LOCK_BATCH_MQ = 41;
    // Broker Consumer向Master解鎖佇列
    public static final int UNLOCK_BATCH_MQ = 42;

兩點使用:

1.執行某個quque時嘗試鎖該queue,但是利用了本地快取鎖processQueue,(非同步呼叫遠端broker更新本地鎖,避免了大量呼叫等待.) 修改消費者的lprocessQueue.  Locked

2.定時任務獲取所有當前queue的鎖.(本地processQueue時由reBalance分配的)

orderService執行consumeRequest時進行鎖判斷,processQueue.isLocked() 鎖失敗無法進行消費.

參考文獻: http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142693 


我的實現:

非順序執行缺少queue的加鎖, broker獲取所有的該queue對應的topic的消費者(從zk上獲取),然後通知queue來獲取資料或者推送.同時保證queue被一個人消費.加鎖.