1. 程式人生 > 實用技巧 >2-rocketmq-訊息傳送和接收

2-rocketmq-訊息傳送和接收

quick start

新增依賴

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.7.1</version>
</dependency>

生產者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        /**
         * 生產者組,簡單來說就是多個傳送同一類訊息的生產者稱之為一個生產者組rocketmq支援事務訊息,在傳送事務訊息時,如果事務訊息異常(producer掛了),broker端會來回查事務的狀態,這個時候會根據group名稱來查詢對應的producer來執行相應的回查邏輯。相當於實現了producer的高可用
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // namesrv地址 多個地址用 ; 隔開  從namesrv上拉取broker資訊
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
              	/**
              	 * 建立訊息例項,指定topic,tag,訊息內容。tag
              	 */
                Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 傳送訊息並獲取傳送結果   同步傳送
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

SendResult中,有一個sendStatus狀態,表示訊息的傳送狀態。一共有四種狀態

  1. FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設定成
    SYNC_FLUSH 才會報這個錯誤) 。
  2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設定成SYNC_MASTER 方式,沒有
    在設定時間內完成主從同步。
  3. SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方
    式下,並且Broker 被設定成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。
  4. SEND OK :表示傳送成功,傳送成功的具體含義,比如訊息是否已經被儲存到磁碟?訊息是否被
    同步到了Slave 上?訊息在Slave 上是否被寫入磁碟?需要結合所配置的刷盤策略、主從策略來
    定。這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND OK

消費者

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //groupName 將多個consumer分組,提高併發處理能力。需要和MessageModel配合
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // 多個地址 ;分開 獲取broker地址 並定時向broker傳送心跳 可以從master/slave獲取訂閱
        consumer.setNamesrvAddr("localhost:9876");
        // 兩種訊息模式  BROADCASTING   CLUSTERING
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //設定consumer第一次啟動從佇列頭部還是尾部開始消費
      	//如果非第一次啟動,那麼按上一次消費的位置繼續消費(取決於本地的offeset資料)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // topic 可通過tag過濾訊息  * 或 null 代表全部
        consumer.subscribe("TopicTest", "*");
        /**註冊訊息處理回撥
         * MessageListenerConcurrently 普通監聽
         * MessageListenerOrderly 順序監聽
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // todo 訊息處理邏輯
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
              	// 返回消費狀態
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消費狀態

ConsumeConcurrentlyStatus {
  	// 消費成功
    CONSUME_SUCCESS,
  // 使用失敗,稍後嘗試使用
    RECONSUME_LATER;

訊息傳送及消費的基本原理

叢集部署,一個master可以有多個slave,一個slave只能有一個master.consumer可以從master獲者slave中訂閱訊息

2m-2s示例:

rocketMQ 沒有實現master選舉(通過配置檔案來指定主從)

當master掛了後 消費者依然能正常消費訊息(slave提供讀服務)

通過groupName實現分割槽,提高消費者的處理能力

消費者

兩種消費者型別

  • DefaultMQPushConsumer 由系統控制讀取操作

DefaultMQPushConsumer

自動儲存offset,自動做負載均衡

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //groupName 將多個consumer分組,提高併發處理能力。需要和MessageModel配合
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // 多個地址 ;分開
        consumer.setNamesrvAddr("localhost:9876");
        // 兩種訊息模式  BROADCASTING   CLUSTERING
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //第一次啟動從 offset頭開始
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // topic 可通過tag過濾訊息  * 或 null 代表全部
        consumer.subscribe("TopicTest", "*");
        //註冊訊息處理回撥
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // todo 訊息處理邏輯
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

兩種訊息模式 BROADCASTING CLUSTERING:

1、在 Clustering 模式下,同一個 ConsumerGroup ( GroupName 相同 ) 裡的每個 Consumer 只消費所訂閱訊息 的一部分 內 容, 同一個 ConsumerGroup裡所有的 Consumer 消 費 的內 容合起來才是所訂閱 Topic 內 容 的 整體 ,從而達到負載均衡的目的 (也就是叢集消費)

2、在 Broadcasting 模式下,同一個 ConsumerGroup 裡的每個 Consumer 都能消費到所訂閱 Topic 的全部訊息,也就是一個訊息會被多次分發,被多個Consumer 消費 。(也就是廣播模式

通過長輪詢的方式獲取訊息

Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新訊息到達就利用現有的連線立刻返回訊息給Consumer。主動權在Consumer

好處是客戶端能充分利用資源,不至於處理不過來

流量控制

DefaultMQPullConsumer

需要自己維護offset,需要通過遍歷MessageQueue獲取訊息

 public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        // 獲取分片
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        // 獲取到訊息
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        // 沒有新訊息
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

Consumer的啟動、關閉

DefaultMQPushConsumer啟動時不會檢查nameServer地址的正確或者可用性

// 從指定topic中拉取所有訊息佇列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");

可以通過上面的方法主動拉取訊息佇列來判斷nameServer的可用性

關閉時呼叫shutdown()即可

DefaultMQPullConsumer關閉或者異常退出時需要將offset儲存起來

才能保證下次啟動時拉取訊息的正確性

consumerGroup:位於同一個consumerGroup中的consumer例項和producerGroup中的各個produer例項承擔的角色類似;同一個group中可以配置多個consumer,可以提高消費端的併發消費能力以及容災
和kafka一樣,多個consumer會對訊息做負載均衡,意味著同一個topic下的不messageQueue會分發給同一個group中的不同consumer

消費端的負載均衡

和kafka一樣,消費端也會針對Message Queue做負載均衡,使得每個消費者能夠合理的消費多個分割槽的訊息。

消費端會通過RebalanceService執行緒,10秒鐘做一次基於topic下的所有佇列負載
  • 消費端遍歷自己的所有topic,依次調rebalanceByTopic

  • 根據topic獲取此topic下的所有queue

  • 選擇一臺broker獲取基於group的所有消費端(有心跳向所有broker註冊客戶端資訊)

  • 選擇佇列分配策略例項AllocateMessageQueueStrategy執行分配演算法

什麼時候觸發負載均衡
  • 消費者啟動之後
  • 消費者數量發生變更
  • 每10秒會觸發檢查一次rebalance
分配演算法

RocketMQ提供了6中分割槽的分配演算法

  • (AllocateMessageQueueAveragely)平均分配演算法(預設)
  • (AllocateMessageQueueAveragelyByCircle)環狀分配訊息佇列
  • (AllocateMessageQueueByConfig)按照配置來分配佇列: 根據使用者指定的配置來進行負載
  • (AllocateMessageQueueByMachineRoom)按照指定機房來配置佇列
  • (AllocateMachineRoomNearby)按照就近機房來配置佇列:
  • (AllocateMessageQueueConsistentHash)一致性hash,根據消費者的cid進行

生產者

DefaultMQProducer 預設生產者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // producerGroupName
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // namesrv地址 多個地址用 ; 隔開
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 返回 
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

訊息返回狀態:SendResult.sendStatus

1、FLUSH DISK TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策略設定成 SYNC FLUSH 才會報這個錯誤) 。
2、 FLUSH SLAVE TIMEOUT :表示在主備方式下,並且 Broker 被設定成 SYNC MASTER 方式,沒有在設定時間內完成主從同步 。
3、SLAVE NOT AVAILABLE : 這個狀態產生的場景和 FLUSH SLAVETIMEOUT 類似, 表示在主備 方式下,並且 Broker 被設定成 SYNCMASTER ,但是沒有找到被配置成 S lave 的 Broker 。
4、SEND OK :表示傳送成功,傳送成功的具體含義,比如訊息是否已經被儲存到融盤?訊息是否被同步到了 S lave 上?訊息在 S lave 上是否被寫人磁碟?需要結合所配置的刷盤策略、主從策略來定 。 這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是 SEND OK

延遲訊息

通過Message.setDelayTimeLevel ( int level ) 方法設定延遲時間,只支援預設值(1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/7m/8m/9m/1 Om/20m/30m/1 h/2h )。 比如setDelayTimeLevel(3)表示延遲 10s 。

自定義訊息傳送規則

實現MessageQueueSelector介面
三種預設實現:
SelectMessageQueueByHash
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom

自定義訊息傳送可以將訊息傳送到指定的MessageQueue裡

對事物的支援

new TransactionMQProducer("groupName");

設定生產者group,當一個producer掛掉了,訊息會分發到其它producer保證訊息一定會被回查確定

訊息的可靠性原則

只有消費者返回CONSUME_SUCCESS消費成功的才會認為消費成功

返回ConsumeConcurrentlyStatus.RECONSUME_LATER消費失敗會被重試

訊息衰減重試

為了保證訊息肯定至少被消費一次,RocketMQ會把這批訊息重新發回到broker,在延遲的某個時間點
(預設是10秒,業務可設定)後,再次投遞到這個ConsumerGroup。而如果一直這樣重複消費都持續
失敗到一定次數(預設16次),就會投遞到DLQ死信佇列。應用可以監控死信佇列來做人工干預
可以修改broker-a.conf檔案
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

重試訊息的處理機制

一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費效能,理論上當嘗試重複次數
達到我們想要的結果時如果還是消費失敗,那麼我們需要將對應的訊息進行記錄,並且結束重複嘗試

consumer.registerMessageListener((MessageListenerConcurrently) (list,
  consumeOrderlyContext) -> {
                for (MessageExt messageExt : list) {
                    if(messageExt.getReconsumeTimes()==3) {
                     //可以將對應的資料儲存到資料庫,以便人工干預	
                       System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                } r
                eturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

死信佇列

RocketMQ會為每個消費組都設定一個Topic命名為“%DLQ%+consumerGroup"的死信佇列