RocketMQ原始碼分析之順序消費
概述
RocketMQ按照順序消費有兩種順序級別,一種是普通順序訊息。另外一種是更完全嚴格順序
- 普通順序訊息指的是Producer將訊息傳送到相對應的訊息佇列上面
- 完全嚴格順序:在普通順序訊息的基礎上,Consumer嚴格進行順序消費
在絕大部分情況下只需要用到普通順序訊息,大部分的應用都能容忍短暫的亂序;官方文件給出的說明中表示目前只有資料庫的Binlog同步會強依賴完全嚴格順序(要保證資料庫事務的ACID特性)。
Comsumer嚴格順序消費
Consumer在進行嚴格順序消費的時候,需要利用到三把鎖,Broker訊息佇列鎖——>Consumer訊息佇列鎖——>Consumer訊息處理佇列消費鎖,鎖粒度越來越細
- Broker訊息佇列鎖是一個分散式鎖,在叢集模式下需要,在廣播模式下不需要,叢集模式中,只有獲得該鎖才能對Broker中的訊息佇列進行訊息拉取操作
- Consumer訊息佇列鎖是一個Broker本地鎖,只有Consumer獲得該鎖後才能操作訊息佇列
- Consumer
- 訊息處理佇列消費鎖是消費者客戶端的一個本地鎖,只有Consumer獲得該鎖後才能對訊息處理佇列進行訊息的消費。
獲取分散式Broker訊息佇列鎖【RebalanceImpl.java#updateProcessQueueTableInRebalance】
在叢集分散式模式下,Broker會不斷接收訊息佇列的更新請求,Broker就會通過更新操作檢查這個訊息佇列是不是屬於自己,具體就是通過鎖定這個分散式鎖實現的,如果鎖獲取成功,表示這個訊息佇列是屬於自己的,允許進行訊息拉取,如果獲取鎖失敗,則不允許進行訊息拉取。
Broker訊息佇列分散式鎖預設30s會過期,因此Consumer需要不斷重新整理該鎖的過期時間,預設20s就會重新整理一次
1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: public void start() {
3: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
4: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
5: @Override
6: public void run() {
7: ConsumeMessageOrderlyService.this.lockMQPeriodically();
8: }
9: }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
10: }
11: }
移除訊息佇列(已有Consumer對這個訊息佇列消費了,移除它就是告訴Broker這個訊息佇列不需要其他的Consumer來消費)RebalancePushImpl.java#removeUnnecessaryMessageQueue
在叢集模式下,為了避免其它Consuer在獲取分散式鎖時和訊息佇列的消費衝突,如果獲取鎖失敗,進行移除訊息佇列將會失敗,等到下次重新分配消費佇列的時候,再進行移除;如果在沒有獲取分散式鎖的情況下就進行訊息佇列移除,那麼可能會導致當前Consumer和其他的Consumer同時消費該訊息佇列,這樣將惡惡法保證訊息按照完全嚴格順序消費。
在解鎖Broker訊息佇列鎖的時候,如果訊息佇列存在剩下沒被拉取的訊息,則進行延遲解鎖【RebanlancePushImpl.java#unlockDelay】Broker訊息佇列鎖。因為這樣能保證還沒被拉取的訊息能被全部拉取進行消費,這樣才能保證訊息被完全嚴格順序消費。
消費訊息佇列
消費訊息時序圖
// ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】
2: class ConsumeRequest implements Runnable {
3:
4: /**
5: * 訊息處理佇列
6: */
7: private final ProcessQueue processQueue;
8: /**
9: * 訊息佇列
10: */
11: private final MessageQueue messageQueue;
12:
13: @Override
14: public void run() {
15: if (this.processQueue.isDropped()) {
16: log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
17: return;
18: }
19:
20: // 獲得 Consumer 訊息佇列鎖
21: final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
22: synchronized (objLock) {
23: // (廣播模式) 或者 (叢集模式 && Broker訊息佇列鎖有效)
24: if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
25: || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
26: final long beginTime = System.currentTimeMillis();
27: // 迴圈
28: for (boolean continueConsume = true; continueConsume; ) {
29: if (this.processQueue.isDropped()) {
30: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
31: break;
32: }
33:
34: // 訊息佇列分散式鎖未鎖定,提交延遲獲得鎖並消費請求
35: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
36: && !this.processQueue.isLocked()) {
37: log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
38: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
39: break;
40: }
41: // 訊息佇列分散式鎖已經過期,提交延遲獲得鎖並消費請求
42: if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
43: && this.processQueue.isLockExpired()) {
44: log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
45: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
46: break;
47: }
48:
49: // 當前週期消費時間超過連續時長,預設:60s,提交延遲消費請求。預設情況下,每消費1分鐘休息10ms。
50: long interval = System.currentTimeMillis() - beginTime;
51: if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
52: ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
53: break;
54: }
55:
56: // 獲取消費訊息。此處和併發訊息請求不同,併發訊息請求已經帶了消費哪些訊息。
57: final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
58: List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
59: if (!msgs.isEmpty()) {
60: final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
61:
62: ConsumeOrderlyStatus status = null;
63:
64: // ....省略程式碼:Hook:before
65:
66: // 執行消費
67: long beginTimestamp = System.currentTimeMillis();
68: ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
69: boolean hasException = false;
70: try {
71: this.processQueue.getLockConsume().lock(); // 鎖定佇列消費鎖
72:
73: if (this.processQueue.isDropped()) {
74: log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
75: this.messageQueue);
76: break;
77: }
78:
79: status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
80: } catch (Throwable e) {
81: log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
82: RemotingHelper.exceptionSimpleDesc(e), //
83: ConsumeMessageOrderlyService.this.consumerGroup, //
84: msgs, //
85: messageQueue);
86: hasException = true;
87: } finally {
88: this.processQueue.getLockConsume().unlock(); // 鎖定佇列消費鎖
89: }
90:
91: // ....省略程式碼:解析消費結果狀態
92:
93: // ....省略程式碼:Hook:after
94:
95: ConsumeMessageOrderlyService.this.getConsumerStatsManager()
96: .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
97:
98: // 處理消費結果
99: continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
100: } else {
101: continueConsume = false;
102: }
103: }
104: } else {
105: if (this.processQueue.isDropped()) {
106: log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
107: return;
108: }
109:
110: ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
111: }
112: }
113: }
114:
115: }
處理消費結果
順序消費訊息結果有四種情況(ConsumerOrderlyStatus)
- SUCCESS:消費成功但不提交
- ROLLBACK:消費失敗,並進行消費回滾
- COMMIT:消費成功並提交
- SUSPEND_CURRENT_QUEUE_A_MOMENT:消費失敗並掛起消費
ROLLBACK、COMMIT只有在事務訊息下(Binlog)使用,被官方標記為@Deprecated
在併發場景中,如果消費失敗,Consuner會將消f失敗訊息發回到Broker重試佇列中。然後跳過當前訊息,等到下次拉取該訊息再進行消費。
不過消費失敗的訊息一直失敗,也不可能一直消費。當超過消費重試上限時,Consumer 會將消費失敗超過上限的訊息發回到 Broker 死信佇列。預設16次,最後會提交消費進度
在完全嚴格順序消費中,如果消費失敗,就掛起佇列一會兒,稍後繼續消費
訊息處理佇列核心方法
1: // ⬇️⬇️⬇️【ProcessQueue.java】
2: /**
3: * 訊息對映
4: * key:訊息佇列位置
5: */
6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>(); /**
7: * 訊息對映臨時儲存(消費中的訊息)
8: */
9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();
10:
11: /**
12: * 回滾消費中的訊息
13: * 邏輯類似於{@link #makeMessageToCosumeAgain(List)}
14: */
15: public void rollback() {
16: try {
17: this.lockTreeMap.writeLock().lockInterruptibly();
18: try {
19: this.msgTreeMap.putAll(this.msgTreeMapTemp);
20: this.msgTreeMapTemp.clear();
21: } finally {
22: this.lockTreeMap.writeLock().unlock();
23: }
24: } catch (InterruptedException e) {
25: log.error("rollback exception", e);
26: }
27: }
28:
29: /**
30: * 提交消費中的訊息已消費成功,返回消費進度
31: *
32: * @return 消費進度
33: */
34: public long commit() {
35: try {
36: this.lockTreeMap.writeLock().lockInterruptibly();
37: try {
38: // 消費進度
39: Long offset = this.msgTreeMapTemp.lastKey();
40:
41: //
42: msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
43:
44: //
45: this.msgTreeMapTemp.clear();
46:
47: // 返回消費進度
48: if (offset != null) {
49: return offset + 1;
50: }
51: } finally {
52: this.lockTreeMap.writeLock().unlock();
53: }
54: } catch (InterruptedException e) {
55: log.error("commit exception", e);
56: }
57:
58: return -1;
59: }
60:
61: /**
62: * 指定訊息重新消費
63: * 邏輯類似於{@link #rollback()}
64: *
65: * @param msgs 訊息
66: */
67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
68: try {
69: this.lockTreeMap.writeLock().lockInterruptibly();
70: try {
71: for (MessageExt msg : msgs) {
72: this.msgTreeMapTemp.remove(msg.getQueueOffset());
73: this.msgTreeMap.put(msg.getQueueOffset(), msg);
74: }
75: } finally {
76: this.lockTreeMap.writeLock().unlock();
77: }
78: } catch (InterruptedException e) {
79: log.error("makeMessageToCosumeAgain exception", e);
80: }
81: }
82:
83: /**
84: * 獲得持有訊息前N條
85: *
86: * @param batchSize 條數
87: * @return 訊息
88: */
89: public List<MessageExt> takeMessags(final int batchSize) {
90: List<MessageExt> result = new ArrayList<>(batchSize);
91: final long now = System.currentTimeMillis();
92: try {
93: this.lockTreeMap.writeLock().lockInterruptibly();
94: this.lastConsumeTimestamp = now;
95: try {
96: if (!this.msgTreeMap.isEmpty()) {
97: for (int i = 0; i < batchSize; i++) {
98: Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
99: if (entry != null) {
100: result.add(entry.getValue());
101: msgTreeMapTemp.put(entry.getKey(), entry.getValue());
102: } else {
103: break;
104: }
105: }
106: }
107:
108: if (result.isEmpty()) {
109: consuming = false;
110: }
111: } finally {
112: this.lockTreeMap.writeLock().unlock();
113: }
114: } catch (InterruptedException e) {
115: log.error("take Messages exception", e);
116: }
117:
118: return result;
119: }