1. 程式人生 > >RocketMQ原始碼分析之順序消費

RocketMQ原始碼分析之順序消費

概述

RocketMQ按照順序消費有兩種順序級別,一種是普通順序訊息。另外一種是更完全嚴格順序

  • 普通順序訊息指的是Producer將訊息傳送到相對應的訊息佇列上面
  • 完全嚴格順序:在普通順序訊息的基礎上,Consumer嚴格進行順序消費
    在絕大部分情況下只需要用到普通順序訊息,大部分的應用都能容忍短暫的亂序;官方文件給出的說明中表示目前只有資料庫的Binlog同步會強依賴完全嚴格順序(要保證資料庫事務的ACID特性)。

Comsumer嚴格順序消費

Consumer在進行嚴格順序消費的時候,需要利用到三把鎖,Broker訊息佇列鎖——>Consumer訊息佇列鎖——>Consumer訊息處理佇列消費鎖,鎖粒度越來越細
- Broker訊息佇列鎖是一個分散式鎖,在叢集模式下需要,在廣播模式下不需要,叢集模式中,只有獲得該鎖才能對Broker中的訊息佇列進行訊息拉取操作
- Consumer訊息佇列鎖是一個Broker本地鎖,只有Consumer獲得該鎖後才能操作訊息佇列
- Consumer
- 訊息處理佇列消費鎖是消費者客戶端的一個本地鎖,只有Consumer獲得該鎖後才能對訊息處理佇列進行訊息的消費。

獲取分散式Broker訊息佇列鎖【RebalanceImpl.java#updateProcessQueueTableInRebalance】

在叢集分散式模式下,Broker會不斷接收訊息佇列的更新請求,Broker就會通過更新操作檢查這個訊息佇列是不是屬於自己,具體就是通過鎖定這個分散式鎖實現的,如果鎖獲取成功,表示這個訊息佇列是屬於自己的,允許進行訊息拉取,如果獲取鎖失敗,則不允許進行訊息拉取。
Broker訊息佇列分散式鎖預設30s會過期,因此Consumer需要不斷重新整理該鎖的過期時間,預設20s就會重新整理一次

 1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: public void start() { 3: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { 4: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 5: @Override 6: public void run() { 7: ConsumeMessageOrderlyService.this.lockMQPeriodically();
8: } 9: }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); 10: } 11: }

移除訊息佇列(已有Consumer對這個訊息佇列消費了,移除它就是告訴Broker這個訊息佇列不需要其他的Consumer來消費)RebalancePushImpl.java#removeUnnecessaryMessageQueue

在叢集模式下,為了避免其它Consuer在獲取分散式鎖時和訊息佇列的消費衝突,如果獲取鎖失敗,進行移除訊息佇列將會失敗,等到下次重新分配消費佇列的時候,再進行移除;如果在沒有獲取分散式鎖的情況下就進行訊息佇列移除,那麼可能會導致當前Consumer和其他的Consumer同時消費該訊息佇列,這樣將惡惡法保證訊息按照完全嚴格順序消費。
在解鎖Broker訊息佇列鎖的時候,如果訊息佇列存在剩下沒被拉取的訊息,則進行延遲解鎖【RebanlancePushImpl.java#unlockDelay】Broker訊息佇列鎖。因為這樣能保證還沒被拉取的訊息能被全部拉取進行消費,這樣才能保證訊息被完全嚴格順序消費。

消費訊息佇列

消費訊息時序圖

這裡寫圖片描述

// ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
  2: class ConsumeRequest implements Runnable {
  3: 
  4:     /**
  5:      * 訊息處理佇列
  6:      */
  7:     private final ProcessQueue processQueue;
  8:     /**
  9:      * 訊息佇列
 10:      */
 11:     private final MessageQueue messageQueue;
 12: 
 13:     @Override
 14:     public void run() {
 15:         if (this.processQueue.isDropped()) {
 16:             log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 17:             return;
 18:         }
 19: 
 20:         // 獲得 Consumer 訊息佇列鎖
 21:         final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
 22:         synchronized (objLock) {
 23:             // (廣播模式) 或者 (叢集模式 && Broker訊息佇列鎖有效)
 24:             if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 25:                 || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
 26:                 final long beginTime = System.currentTimeMillis();
 27:                 // 迴圈
 28:                 for (boolean continueConsume = true; continueConsume; ) {
 29:                     if (this.processQueue.isDropped()) {
 30:                         log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 31:                         break;
 32:                     }
 33: 
 34:                     // 訊息佇列分散式鎖未鎖定,提交延遲獲得鎖並消費請求
 35:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 36:                         && !this.processQueue.isLocked()) {
 37:                         log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
 38:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
 39:                         break;
 40:                     }
 41:                     // 訊息佇列分散式鎖已經過期,提交延遲獲得鎖並消費請求
 42:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 43:                         && this.processQueue.isLockExpired()) {
 44:                         log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
 45:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
 46:                         break;
 47:                     }
 48: 
 49:                     // 當前週期消費時間超過連續時長,預設:60s,提交延遲消費請求。預設情況下,每消費1分鐘休息10ms。
 50:                     long interval = System.currentTimeMillis() - beginTime;
 51:                     if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
 52:                         ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
 53:                         break;
 54:                     }
 55: 
 56:                     // 獲取消費訊息。此處和併發訊息請求不同,併發訊息請求已經帶了消費哪些訊息。
 57:                     final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
 58:                     List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
 59:                     if (!msgs.isEmpty()) {
 60:                         final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
 61: 
 62:                         ConsumeOrderlyStatus status = null;
 63: 
 64:                         // ....省略程式碼:Hook:before
 65: 
 66:                         // 執行消費
 67:                         long beginTimestamp = System.currentTimeMillis();
 68:                         ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
 69:                         boolean hasException = false;
 70:                         try {
 71:                             this.processQueue.getLockConsume().lock(); // 鎖定佇列消費鎖
 72: 
 73:                             if (this.processQueue.isDropped()) {
 74:                                 log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
 75:                                     this.messageQueue);
 76:                                 break;
 77:                             }
 78: 
 79:                             status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
 80:                         } catch (Throwable e) {
 81:                             log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
 82:                                 RemotingHelper.exceptionSimpleDesc(e), //
 83:                                 ConsumeMessageOrderlyService.this.consumerGroup, //
 84:                                 msgs, //
 85:                                 messageQueue);
 86:                             hasException = true;
 87:                         } finally {
 88:                             this.processQueue.getLockConsume().unlock(); // 鎖定佇列消費鎖
 89:                         }
 90: 
 91:                         // ....省略程式碼:解析消費結果狀態
 92: 
 93:                         // ....省略程式碼:Hook:after
 94: 
 95:                         ConsumeMessageOrderlyService.this.getConsumerStatsManager()
 96:                             .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
 97: 
 98:                         // 處理消費結果
 99:                         continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
100:                     } else {
101:                         continueConsume = false;
102:                     }
103:                 }
104:             } else {
105:                 if (this.processQueue.isDropped()) {
106:                     log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
107:                     return;
108:                 }
109: 
110:                 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
111:             }
112:         }
113:     }
114: 
115: }

處理消費結果

順序消費訊息結果有四種情況(ConsumerOrderlyStatus)

  • SUCCESS:消費成功但不提交
  • ROLLBACK:消費失敗,並進行消費回滾
  • COMMIT:消費成功並提交
  • SUSPEND_CURRENT_QUEUE_A_MOMENT:消費失敗並掛起消費
    ROLLBACK、COMMIT只有在事務訊息下(Binlog)使用,被官方標記為@Deprecated
    在併發場景中,如果消費失敗,Consuner會將消f失敗訊息發回到Broker重試佇列中。然後跳過當前訊息,等到下次拉取該訊息再進行消費。
    不過消費失敗的訊息一直失敗,也不可能一直消費。當超過消費重試上限時,Consumer 會將消費失敗超過上限的訊息發回到 Broker 死信佇列。預設16次,最後會提交消費進度

在完全嚴格順序消費中,如果消費失敗,就掛起佇列一會兒,稍後繼續消費

訊息處理佇列核心方法

 1: // ⬇️⬇️⬇️【ProcessQueue.java】
  2: /**
  3:  * 訊息對映
  4:  * key:訊息佇列位置
  5:  */
  6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();    /**
  7:  * 訊息對映臨時儲存(消費中的訊息)
  8:  */
  9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
 10: 
 11: /**
 12:  * 回滾消費中的訊息
 13:  * 邏輯類似於{@link #makeMessageToCosumeAgain(List)}
 14:  */
 15: public void rollback() {
 16:     try {
 17:         this.lockTreeMap.writeLock().lockInterruptibly();
 18:         try {
 19:             this.msgTreeMap.putAll(this.msgTreeMapTemp);
 20:             this.msgTreeMapTemp.clear();
 21:         } finally {
 22:             this.lockTreeMap.writeLock().unlock();
 23:         }
 24:     } catch (InterruptedException e) {
 25:         log.error("rollback exception", e);
 26:     }
 27: }
 28: 
 29: /**
 30:  * 提交消費中的訊息已消費成功,返回消費進度
 31:  *
 32:  * @return 消費進度
 33:  */
 34: public long commit() {
 35:     try {
 36:         this.lockTreeMap.writeLock().lockInterruptibly();
 37:         try {
 38:             // 消費進度
 39:             Long offset = this.msgTreeMapTemp.lastKey();
 40: 
 41:             //
 42:             msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
 43: 
 44:             //
 45:             this.msgTreeMapTemp.clear();
 46: 
 47:             // 返回消費進度
 48:             if (offset != null) {
 49:                 return offset + 1;
 50:             }
 51:         } finally {
 52:             this.lockTreeMap.writeLock().unlock();
 53:         }
 54:     } catch (InterruptedException e) {
 55:         log.error("commit exception", e);
 56:     }
 57: 
 58:     return -1;
 59: }
 60: 
 61: /**
 62:  * 指定訊息重新消費
 63:  * 邏輯類似於{@link #rollback()}
 64:  *
 65:  * @param msgs 訊息
 66:  */
 67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
 68:     try {
 69:         this.lockTreeMap.writeLock().lockInterruptibly();
 70:         try {
 71:             for (MessageExt msg : msgs) {
 72:                 this.msgTreeMapTemp.remove(msg.getQueueOffset());
 73:                 this.msgTreeMap.put(msg.getQueueOffset(), msg);
 74:             }
 75:         } finally {
 76:             this.lockTreeMap.writeLock().unlock();
 77:         }
 78:     } catch (InterruptedException e) {
 79:         log.error("makeMessageToCosumeAgain exception", e);
 80:     }
 81: }
 82: 
 83: /**
 84:  * 獲得持有訊息前N條
 85:  *
 86:  * @param batchSize 條數
 87:  * @return 訊息
 88:  */
 89: public List<MessageExt> takeMessags(final int batchSize) {
 90:     List<MessageExt> result = new ArrayList<>(batchSize);
 91:     final long now = System.currentTimeMillis();
 92:     try {
 93:         this.lockTreeMap.writeLock().lockInterruptibly();
 94:         this.lastConsumeTimestamp = now;
 95:         try {
 96:             if (!this.msgTreeMap.isEmpty()) {
 97:                 for (int i = 0; i < batchSize; i++) {
 98:                     Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
 99:                     if (entry != null) {
100:                         result.add(entry.getValue());
101:                         msgTreeMapTemp.put(entry.getKey(), entry.getValue());
102:                     } else {
103:                         break;
104:                     }
105:                 }
106:             }
107: 
108:             if (result.isEmpty()) {
109:                 consuming = false;
110:             }
111:         } finally {
112:             this.lockTreeMap.writeLock().unlock();
113:         }
114:     } catch (InterruptedException e) {
115:         log.error("take Messages exception", e);
116:     }
117: 
118:     return result;
119: }