1. 程式人生 > >RocketMQ的幾個關鍵技術點

RocketMQ的幾個關鍵技術點

1,怎麼傳送訊息,並且保證訊息順序並且沒有重複訊息

RocketMQ通過輪詢所有佇列的方式來確定訊息被髮送到哪一個佇列(負載均衡策略)。比如下面的示例中,訂單號相同的訊息會被先後傳送到同一個佇列中:

// RocketMQ通過MessageQueueSelector中實現的演算法來確定訊息傳送到哪一個佇列上
// RocketMQ預設提供了兩種MessageQueueSelector實現:隨機/Hash
// 當然你可以根據業務實現自己的MessageQueueSelector來決定訊息按照何種策略傳送到訊息佇列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在獲取到路由資訊以後,會根據MessageQueueSelector實現的演算法來選擇一個佇列,同一個OrderId獲取到的肯定是同一個佇列。

private SendResult send()  {
    // 獲取topic路由資訊
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根據我們的演算法,選擇一個傳送佇列
        // 這裡的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

上面在解決訊息順序問題時,引入了一個新的問題,就是訊息重複。那麼RocketMQ是怎樣解決訊息重複的問題呢?還是“恰好”不解決。

造成訊息重複的根本原因是:網路不可達。只要通過網路交換資料,就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:如果消費端收到兩條一樣的訊息,應該怎樣處理?

  1. 消費端處理訊息的業務邏輯保持冪等性
  2. 保證每條訊息都有唯一編號且保證訊息處理成功與去重表的日誌同時出現

第1條很好理解,只要保持冪等性,不管來多少條重複訊息,最後處理的結果都一樣。第2條原理就是利用一張日誌表來記錄已經處理成功的訊息的ID,如果新到的訊息ID已經在日誌表中,那麼就不再處理這條訊息。

第1條解決方案,很明顯應該在消費端實現,不屬於訊息系統要實現的功能。第2條可以訊息系統實現,也可以業務端實現。正常情況下出現重複訊息的概率其實很小,如果由訊息系統來實現的話,肯定會對訊息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理訊息重複的問題,這也是RocketMQ不解決訊息重複的問題的原因