1. 程式人生 > >RabbitMQ消費方式彙總

RabbitMQ消費方式彙總

在學習本章節前,請先學習之前的章節:
Java訪問RabbitMQ:https://www.cnblogs.com/duanjt/p/10057330.html
RabbitMQ訊息釋出時的權衡:https://www.cnblogs.com/duanjt/p/10075308.html


一、推送Consume


前面我們使用到的都是這種模式,註冊一個消費者後,RabbitMQ會在訊息可用時,自動將訊息進行推送給消費者。這種方式效率最高最及時。
核心程式碼如下:

// 接收訊息,第二個引數表示是否自動應答
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getRoutingKey() + " 接收到資料:" + new String(body)); } });

 

 

二、拉取Get


屬於一種輪詢模型,傳送一次get請求,獲得一個訊息。如果此時RabbitMQ中沒有訊息,會獲得一個表示空的回覆。總的來說,這種方式效能比較差,很明顯,每獲得一條訊息,都要和RabbitMQ進行網路通訊發出請求。而且對RabbitMQ來說,RabbitMQ無法進行任何優化,因為它永遠不知道應用程式何時會發出請求。
核心程式碼如下:

while(true){
    //如果沒有訊息,將返回null
    GetResponse getResponse = channel.basicGet(queueName, true);    
    if(null!=getResponse){
        System.out.println("received["+getResponse.getEnvelope().getRoutingKey()+"]"+new String(getResponse.getBody()));
    }
    Thread.sleep(1000);
}

 

 

三、自動確認


方法channel.basicConsume和方法channel.basicGet表示同步或非同步獲取訊息,第二個引數都表示是否自動確認。前面我們都設定為了true。這個時候我們只需要處理邏輯,將自動向RabbitMQ進行確認。
當autoAck=true時,一旦消費者接收到了訊息,就視為自動確認了訊息。如果消費者在處理訊息的過程中,出了錯,就沒有什麼辦法重新處理這條訊息,所以我們很多時候,需要在訊息處理成功後,再確認訊息,這就需要手動確認。

 

四、手動確認

 

// 接收訊息手動確認,第二個引數表示是否自動應答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到資料:" + new String(body));
        //手動確認,第一個引數是訊息標識,第二個引數表示是否批量確認。這裡是一條一條確認,所以設定false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 

 

五、QoS預取模式


在確認訊息被接收之前,消費者可以預先要求接收一定數量的訊息,在處理完一定數量的訊息後,批量進行確認。如果消費者應用程式在確認訊息之前崩潰,則所有未確認的訊息將被重新發送給其他消費者。所以這裡存在著一定程度上的可靠性風險。
這種機制一方面可以實現限速(將訊息暫存到RabbitMQ記憶體中)的作用,一方面可以保證訊息確認質量(比如確認了但是處理有異常的情況)
核心程式碼:

//引數1表示限制條數,引數2 true=channel,false=消費者
channel.basicQos(100, true);

// 接收訊息手動確認,第二個引數表示是否自動應答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到資料:" + new String(body));
        //手動確認,第一個引數是訊息標識,第二個引數表示是否批量確認。這裡是一條一條確認,所以設定false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 

 

注意:
1.消費確認模式必須是非自動ACK機制(這個是使用baseQos的前提條件,否則會Qos不生效),然後設定basicQos的值;另外,還可以基於consume和channel的粒度進行設定(global)