1. 程式人生 > >RabbitMQ消費方式匯總

RabbitMQ消費方式匯總

都是 efault esp 實現 取模 != 之前 handle tin

在學習本章節前,請先學習之前的章節:
Java訪問RabbitMQ:https://www.cnblogs.com/duanjt/p/10057330.html
RabbitMQ消息發布時的權衡:https://www.cnblogs.com/duanjt/p/10075308.html


一、推送Consume


前面我們使用到的都是這種模式,註冊一個消費者後,RabbitMQ會在消息可用時,自動將消息進行推送給消費者。這種方式效率最高最及時。
核心代碼如下:

// 接收消息,第二個參數表示是否自動應答
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getRoutingKey() + " 接收到數據:" + new String(body)); } });

二、拉取Get


屬於一種輪詢模型,發送一次get請求,獲得一個消息。如果此時RabbitMQ中沒有消息,會獲得一個表示空的回復。總的來說,這種方式性能比較差,很明顯,每獲得一條消息,都要和RabbitMQ進行網絡通信發出請求。而且對RabbitMQ來說,RabbitMQ無法進行任何優化,因為它永遠不知道應用程序何時會發出請求。
核心代碼如下:

while(true){
    //如果沒有消息,將返回null
    GetResponse getResponse = channel.basicGet(queueName, true);    
    if(null!=getResponse){
        System.out.println("received["+getResponse.getEnvelope().getRoutingKey()+"]"+new String(getResponse.getBody()));
    }
    Thread.sleep(1000);
}

三、自動確認


方法channel.basicConsume和方法channel.basicGet表示同步或異步獲取消息,第二個參數都表示是否自動確認。前面我們都設置為了true。這個時候我們只需要處理邏輯,將自動向RabbitMQ進行確認。
當autoAck=true時,一旦消費者接收到了消息,就視為自動確認了消息。如果消費者在處理消息的過程中,出了錯,就沒有什麽辦法重新處理這條消息,所以我們很多時候,需要在消息處理成功後,再確認消息,這就需要手動確認。

四、手動確認

// 接收消息手動確認,第二個參數表示是否自動應答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到數據:" + new String(body));
        //手動確認,第一個參數是消息標識,第二個參數表示是否批量確認。這裏是一條一條確認,所以設置false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

五、QoS預取模式


在確認消息被接收之前,消費者可以預先要求接收一定數量的消息,在處理完一定數量的消息後,批量進行確認。如果消費者應用程序在確認消息之前崩潰,則所有未確認的消息將被重新發送給其他消費者。所以這裏存在著一定程度上的可靠性風險。
這種機制一方面可以實現限速(將消息暫存到RabbitMQ內存中)的作用,一方面可以保證消息確認質量(比如確認了但是處理有異常的情況)
核心代碼:

//參數1表示限制條數,參數2 true=channel,false=消費者
channel.basicQos(100, true);

// 接收消息手動確認,第二個參數表示是否自動應答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到數據:" + new String(body));
        //手動確認,第一個參數是消息標識,第二個參數表示是否批量確認。這裏是一條一條確認,所以設置false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

註意:
1.消費確認模式必須是非自動ACK機制(這個是使用baseQos的前提條件,否則會Qos不生效),然後設置basicQos的值;另外,還可以基於consume和channel的粒度進行設置(global)

RabbitMQ消費方式匯總