1. 程式人生 > >rocketmq學習(三)進階

rocketmq學習(三)進階

rocketmq進階

訊息基礎

預設情況下,producer會輪詢的將訊息傳送到每個佇列中(所有broker下的Queue合併成一個List去輪詢),提高系統吞吐力。這樣分佈帶來的問題,就是從全域性上不能做到順序性(很多時候也並不需要全域性上的順序性)。

消費完後的訊息去哪裡了?

訊息的儲存是一直存在於CommitLog中的,由於CommitLog是以檔案為單位(而非訊息)存在的,而且CommitLog的設計是隻允許順序寫,且每個訊息大小不定長,所以這決定了訊息檔案幾乎不可能按照訊息為單位刪除(否則效能會極具下降,邏輯也非常複雜)。

所以訊息被消費了,訊息所佔據的物理空間也不會立刻被回收。但訊息既然一直沒有刪除,那RocketMQ怎麼知道應該投遞過的訊息就不再投遞?——答案是客戶端自身維護——客戶端拉取完訊息之後,在響應體中,broker會返回下一次應該拉取的位置,PushConsumer通過這一個位置,更新自己下一次的pull請求。這樣就保證了正常情況下,訊息只會被投遞一次。

什麼時候清理物理訊息檔案?

那訊息檔案到底刪不刪,什麼時候刪?

訊息儲存在CommitLog之後,的確是會被清理的,但是這個清理只會在以下任一條件成立才會批量刪除訊息檔案(CommitLog):

  • 訊息檔案過期(預設72小時),且到達清理時點(預設是凌晨4點),刪除過期檔案。
  • 訊息檔案過期(預設72小時),且磁碟空間達到了水位線(預設75%),刪除過期檔案。
  • 磁碟已經達到必須釋放的上限(85%水位線)的時候,則開始批量清理檔案(無論是否過期),直到空間充足。
    注:若磁碟空間達到危險水位線(預設90%),出於保護自身的目的,broker會拒絕寫入服務。

這樣設計帶來的好處

訊息的物理檔案一直存在,消費邏輯只是聽客戶端的決定而搜尋出對應訊息進行,這樣做,筆者認為,有以下幾個好處:

一個訊息很可能需要被N個消費組(設計上很可能就是系統)消費,但訊息只需要儲存一份,消費進度單獨記錄即可。這給強大的訊息堆積能力提供了很好的支援——一個訊息無需複製N份,就可服務N個消費組。

由於消費從哪裡消費的決定權一直都是客戶端決定,所以只要訊息還在,就可以消費到,這使得RocketMQ可以支援其他傳統訊息中介軟體不支援的回溯消費。即我可以通過設定消費進度回溯,就可以讓我的消費組重新像放快照一樣消費歷史訊息;或者我需要另一個系統也複製歷史的資料,只需要另起一個消費組從頭消費即可(前提是訊息檔案還存在)。

訊息索引服務。只要訊息還存在就能被搜尋出來。所以可以依靠訊息的索引搜尋出訊息的各種原資訊,方便事後排查問題。

注:在訊息清理的時候,由於訊息檔案預設是1GB,所以在清理的時候其實是在刪除一個大檔案操作,這對於IO的壓力是非常大的,這時候如果有訊息寫入,寫入的耗時會明顯變高。這個現象可以在凌晨4點(預設刪時間時點)後的附近觀察得到。

RocketMQ官方建議Linux下檔案系統改為Ext4,對於檔案刪除操作,相比Ext3有非常明顯的提升

順序

順序消費

簡介

訊息有序指的是一類訊息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條訊息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照這個順序消費才有意義。但同時訂單之間又是可以並行消費的。
順序訊息被存入同一個queue,繼而在消費端被順序消費

順序監聽器

使用MessageListenerOrderly順序監聽器,進行單執行緒消費

順序生產

通過 順序生產,保證順序消費,傳送訊息時,使用MessageQueueSelector:訊息佇列選擇器,將需要順序消費的訊息存入同一queue

訊息重複

簡介

可能因為網路問題而產生重複消費

誰解決

業務還是MQ解決?
讓MQ解決,可能會拖慢MQ;給業務解決,那就最好有統一的處理介面,避免眾多業務各有一套處理邏輯,令維護疲於奔命。
因此,業務解決重複比較合適。

如何解決

記錄訊息ID

記錄訊息ID,下次消費到相同ID時,跳過消費。

訊息斷點續傳

消費進度

commitlog:持久化訊息元資料,包括訊息主體等;
consumeQueue:記錄資料位置;
若使用PullConsumer模式,如何ack?如何保證消費等均?自己實現。

消費異常

如何保證消費?

消費異常時,重新投遞

 consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + "MESSAGES:" + msgs);// 訊息
            doSomething(msgs);// 執行真正消費,return ConsumeConcurrentlyStatus
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

  若訊息消費失敗(資料庫異常、餘額不足扣款失敗等業務),訊息需要重試,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批訊息消費失敗了。
  為了保證訊息至少被成功消費一次,RocketMQ會把這批訊息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(預設是10秒,業務可設定)後,再次投遞到這個ConsumerGroup。而如果一直這樣重複消費都持續失敗到一定次數(預設16次),就會投遞到DLQ死信佇列,應用可以監控死信佇列來做人工干預。

  • 如果業務的回撥沒有處理好而丟擲異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理;

  • 當使用順序消費的回撥MessageListenerOrderly時,由於順序消費是要前者消費成功才能繼續消費,所以沒有RECONSUME_LATER的這個狀態,只有SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停佇列的其餘消費,原訊息不斷重試直至成功才能繼續消費,也就是說有可能卡死。

啟動的時候從哪裡消費

  當新例項啟動的時候,PushConsumer會拿到本消費組broker已經記錄好的消費進度(consumer offset),按照這個進度發起自己的第一次Pull請求。

如果這個消費進度在Broker並沒有儲存起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:

CONSUME_FROM_LAST_OFFSET //預設策略,從該佇列最尾開始消費,即跳過歷史訊息
CONSUME_FROM_FIRST_OFFSET //從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
所以,社群中經常有人問:“為什麼我設了CONSUME_FROM_LAST_OFFSET,歷史的訊息還是被消費了”? 原因就在於只有全新的消費組才會使用到這些策略,老的消費組都是按已經儲存過的消費進度繼續消費。

老消費組跳過歷史訊息

  • 在程式碼按日期判斷,太老的訊息直接return CONSUME_SUCCESS過濾
  • 在程式碼判斷訊息的offset和MAX_OFFSET相差甚遠,認為是積壓了很多歷史訊息,直接return CONSUME_SUCCESS過濾
  • 消費者啟動前,先調整該消費組的消費進度,再開始消費——可以人工使用命令resetOffsetByTime,或呼叫內部的運維介面(linux命令、ng控制檯等),祥見ResetOffsetByTimeCommand.java

回溯消費

  • 指定時間點回溯
  • 指定訊息ID回溯
  • 開新消費組過濾消費
    其實就是老消費組跳過歷史訊息的程式碼實現
    利用訊息出生時間點這個引數,過濾太久遠的訊息,如:
@Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         for(MessageExt msg: msgs){
             if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分鐘之前的認為過期
                 continue;//過期訊息跳過
             }

             //do consume here

         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }

過濾掉堆積太多的訊息,如:

@Override
     public ConsumeConcurrentlyStatus consumeMessage(//
         List<MessageExt> msgs, //
         ConsumeConcurrentlyContext context) {
         long offset = msgs.get(0).getQueueOffset();
         String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
         long diff = Long. parseLong(maxOffset) - offset;
         if (diff > 100000) { //訊息堆積了10W情況的特殊處理
             return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
         }
         //do consume here
         return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
     }

過濾消費

說明

誕生點

誕生點是個時間戳,通過比較,過濾一定時間段的訊息

偏移

雷同誕生點過濾,通過偏移值和當前偏移值比較,過濾一定偏移差範圍內的訊息

離線訊息

  • 根據消費策略,從某個起點消費

事務訊息

比如,資料庫事務,一套業務,必須保證完全成功,有例外就需要回滾
業務的一致性,一系列業務過程必須保證完全成功的場景就是事務
這樣的訊息有多個狀態,並且其傳送是兩階段的。第一個階段傳送PREPARED狀態的訊息,此時consumer是看不見這種狀態的訊息的,傳送完畢後回撥使用者的TransactionExecutor介面,執行相應的事務操作(如資料庫),當事務操作成功時,則對此條訊息返回commit,讓broker對該訊息執行commit操作,成為commit狀態的訊息對consumer是可見的。

Created with Raphaël 2.1.2StartYour OperationYes or No?Endyesno