1. 程式人生 > 程式設計 >RabbitMQ(四) --消費者Consumer

RabbitMQ(四) --消費者Consumer

一:摘要概述

經過前面三篇文章的學習,對於RabbitMQ中介軟體應該處於撥開雲霧見青天階段。本文將趁熱打鐵,完善RabbitMQ基礎應用最後一個消費版塊。當然文中會持續深入講解有關訊息分發、消費端確認等中階特性

二:訊息消費

MQ佇列可以理解為物品寄存中心,放進去總要拿出來用,一直放著沒有利息還會持久增加成本引發系列問題。MQ儲存的訊息使用有兩種途徑,RabbitMQ服務推送、消費者客戶端拉取

2.1 拉取訊息

通過baiscGet()拉取RabbitMQ服務端訊息有以下幾點需要注意:

  • 一次只能消費一條訊息,千萬別使用用迴圈代替後面的baiscConsume()
  • 前面講的佇列建立引數有AutoDelete,但是注意這個自動刪除前提為至少有一個消費者連線到佇列
    ,並且當所有消費者斷開時刪除,這裡通過basicGet()消費不包含在內
        // 設定佇列自動刪除
        channel.queueDeclare("autoDeleteQueue",true,false,null);
        channel.basicPublish("","autoDeleteQueue",null,"測試".getBytes());
        // 驗證basicGet不觸發佇列自動刪除
        channel.basicGet("autoDeleteQueue",true);
複製程式碼

當然basicGet()方法自身API比較簡單,第一個引數指明消費佇列,第二個引數設定是否自動應答即AutoAck。返回物件也就封裝Envelope

BasicPropertiesbody訊息體等,具體資訊如下表所示:

序號 方法引數 含義
1 queue 佇列名稱,指定消費者消費佇列
2 autoAck 自動應答,開啟為true後RabbitMQ應用送出訊息將立即刪除
序號 返回值 備註
1 envelope 包含deliveryTag、exchange、routingKey等資訊
2 props BasicProperties物件,即訊息生產時設定的該物件特性
3 body 訊息體byte陣列
4 messageCount 訊息數量
2.2 推送訊息

相對於拉取訊息而言,basicConsume()

推送訊息更加符合生產環境的需求,持續監控消費佇列。自然其API也更加複雜,常用系列過載引數如下表所示:

序號 方法引數 含義
1 queue 消費佇列名稱
2 autoAck 自動確認提交
3 consumerTag 消費者唯一標識
4 noLocal 不消費同一Connection連線生產的訊息
5 consumer 具體組織消費邏輯物件,裡面提供系列過載方法使用者消費邏輯組裝

推送訊息消費最後一般都是採用實現Consumer介面亦或是繼承DefaultConsumer類,DefaultConsumer實現了介面Consumer,但是大多數方法都是空實現,需要重寫其中的邏輯。其中重要方法執行時間如下表所示:

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery (String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
                // 處理訊息邏輯
            }
        };
        channel.basicConsume("queueName","consumerTag",defaultConsumer);
複製程式碼
序號 方法 執行時間
1 handleDelivery() 消費訊息邏輯
2 handleConsumeOk() 第一篇文章就就講到Consume-Ok命令在delivery命令前,即每個消費者開始消費前都會執行該方法一次
3 handleShutdownSignal() 當連線Connection / 通道Channel斷開關閉時執行一次
4 handleRecoverOk() baiscRecover()命令佇列重發未確認訊息,當未確認訊息被重發前執行這個方法
5 handleCancelOk() baiscCancel()顯示取消消費者,當消費者被取消時指定這個方法
2.3 佇列重發

上面講到Consumer的方法handleRecoverOk()將會在訊息重發時呼叫,顯示的呼叫訊息重發方法為basicRecover()。該方法只有一個引數:

  • true:表示可以將重發訊息傳送給其它消費者
  • false:表示只能將訊息傳送給相同的消費者
    在這裡插入圖片描述

三:訊息確認

第一篇文章中有一個命令是Basic.Ack用於客戶端向服務端反饋確認訊息已經正常消費,當接收到命令後RabbityMQ服務端才會刪除訊息,從消費者客戶端確保訊息不會丟失。自然有確認就有拒絕確認,本節將介紹basicAck()、basicReject()、basicNack()

3.1 確認消費basicAck

RabbitMQ反饋確認消費通過命令basicAck()實現,該方法具備兩個引數deliveryTagmultiple

	channel.basicAck(envelope.getDeliveryTag(),false);
複製程式碼
  • deliveryTag:確認訊息的編號,這是每個訊息被消費時都會分配一個遞增唯一編號
  • multiple:批量確認,true表示所有編號小於目前確認訊息編號的待確認訊息都會被確認,false則只確認當前訊息

特別注意:訊息的編碼是每個通道Channel範圍的,批量確認操作也是針對當前Channel通道的操作。請務必記住這個範圍

3.2 拒絕確認basicReject

程式在消費訊息過程中丟擲異常,或者是訊息需要重複消費,這時候就可以將消費的訊息拒絕確認。拒絕確認的訊息有兩種去處,刪除、放回佇列,通過引數requeue控制,拒絕確認的訊息放回佇列時會放置在佇列首位,拒絕訊息不放回佇列可以搭配死信佇列使用

	void basicReject(long deliveryTag,boolean requeue) throws IOException;
複製程式碼
3.3 拒絕確認basicNack

確認消費可以批量確認,為什麼拒絕確認訊息不能批量拒絕?所以為了補充basicReject()不足提出了basicNack()。這個API相對於basicReject()而言多了一個引數multiple,效果與批量確認一致

    void basicNack(long deliveryTag,boolean multiple,boolean requeue)
            throws IOException;
複製程式碼

四:訊息預取

在這裡插入圖片描述
訊息消費RabbitMQ採取的策略就是輪詢機制,將每個訊息傳送給唯一的消費者。每個消費者獲取到的訊息都是平均的,這樣的機制會導致下列問題:

  • 某些訊息耗時超長,平均分配訊息後可能導致某些消費者積壓過多未消費訊息,而同時某些消費者處於空閒狀態,導致系統吞吐量下降

通過下面程式碼可以告訴RabbitMQ服務端,我只接受prefetchCount數量的未確認訊息,當消費者客戶端未確認訊息達到限定值後服務端將不會給該消費者推送資料。第二個引數的含義如下表:

引數值 含義
false 預設值,單獨應用於通道上所有消費者
true 通道上所有消費者共享
    void basicQos(int prefetchCount,boolean global) throws IOException;
複製程式碼

合理的訊息預取配合消費端手動ACK確認機制可以很好的優化平衡消費者效能,這個預取數量問題可以根據佇列訊息增長率與消費端訊息處理效率平衡

五:RPC

使用RabbitMQ完成RPC操作其實比較簡單,就是使用了前面講到過的BasicPeoperties物件。傳送訊息時訊息可以攜帶該物件,前面使用到了物件的deliveryMod持久化、priority優先順序、expiration自動過期刪除屬性。這裡RPC將會使用到replyTo屬性告訴RPC服務端執行完畢後回撥佇列地址,correlationId用於標識請求唯一ID

5.1 RPC客戶端
  • UUID生成correlationId請求唯一標識ID,客戶端消費回撥佇列時確認歸屬與自己的請求回撥
  • ArrayBlockingQueue阻塞佇列用於阻塞主執行緒等待RPC服務端完成邏輯以後的回撥
  • 如果想限制RPC遠端超時時間則可以在阻塞佇列等待方法take()中新增最大等待時長
    @SneakyThrows
    public static void main (String[] args) {
        Channel channel = TemplateConfigServiceImpl.createChannel();
        // 建立BasicProperties賦值replyTo回撥佇列名稱、correlationId請求唯一標識ID
        String correlationId = UUID.randomUUID().toString();
        String replyQueue = "queue1";
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(replyQueue).correlationId(correlationId).build();
        // 客戶端向服務端監控佇列傳送訊息
        String rpcQueue = "queueName";
        channel.basicPublish("",rpcQueue,basicProperties,"RPC測試".getBytes());
        // 建立阻塞佇列等到訊息回撥
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        // 監控回撥佇列訊息獲取遠端呼叫結果
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery (String consumerTag,byte[] body) throws IOException {
                // 校驗訊息唯一標識是否匹配
                String replyCorrelationId = properties.getCorrelationId();
                long deliveryTag = envelope.getDeliveryTag();
                if (correlationId.equals(replyCorrelationId)){
                    // 將回撥訊息放到阻塞佇列中
                    arrayBlockingQueue.offer(new String(body));
                    channel.basicAck(deliveryTag,false);
                }else {
                    // 不匹配訊息放回佇列
                    channel.basicReject(deliveryTag,true);
                }
            }
        };
        channel.basicConsume(replyQueue,"ConsumerTag",defaultConsumer);
        // 阻塞等待阻塞佇列中訊息處理後續邏輯
        String take = arrayBlockingQueue.take();
        System.out.println(take);
    }
複製程式碼
5.2 RPC服務端

整體RPC服務端、客戶端實現都是最簡陋的自行車設計,如果想要更復雜的邏輯請自行完成

    @SneakyThrows
    public static void main (String[] args) {
        Channel channel = TemplateConfigServiceImpl.createChannel();
        // 監控RPC佇列訊息執行任務
        String rpcQueue = "queueName";
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery (String consumerTag,byte[] body) throws IOException {
                // 執行計算邏輯
                System.out.println("RPC遠端服務端開始執行任務");
                System.out.println(new String(body));
                // 組裝回撥訊息
                String replyTo = properties.getReplyTo();
                channel.basicPublish("",replyTo,properties,"RPC遠端計算任務完成".getBytes());
                // 確認訊息消費
                long deliveryTag = envelope.getDeliveryTag();
                channel.basicAck(deliveryTag,false);
            }
        };
        channel.basicConsume(rpcQueue,defaultConsumer);
    }
複製程式碼