1. 程式人生 > >RabbitMQ學習(六)——訊息確認機制(Confirm模式)

RabbitMQ學習(六)——訊息確認機制(Confirm模式)

在上一篇文章中我們講解了RabbitMQ中的AMQP事務來保證訊息傳送到Broker端,同時我們可以在事務之間傳送多條訊息(即在channel.txSelect()和channel.txCommit()之間傳送多條訊息,通過使用事務來保證它們準確到達Broker),如果忘記了事務的使用,可以複習一下上一篇文章——RabbitMQ學習(五)——訊息確認機制(AMQP事務)

但是使用事務雖然可以保證訊息的準確達到,但是它極大地犧牲了效能,因此我們為了效能上的要求,可以採用另一種高效的解決方案——通過使用Confirm模式來保證訊息的準確性。

這裡的Confirm模式可以分為兩個方面來講解,一是訊息的生產者(Producer)的Confirm模式,另一個是訊息的消費者(Consumer)的Confirm模式。其實這兩種模式在前面幾節的程式碼裡我們都有涉及到的,只是沒有詳細分析,這裡我們將詳細講解一下它們的具體用法和原理。

一、生產者(Producer)的Confirm模式

通過生產者的確認模式我們是要保證訊息準確達到Broker端,而與AMQP事務不同的是Confirm是針對一條訊息的,而事務是可以針對多條訊息的。

傳送原理圖大致如下:

Producer-Confirm

為了使用Confirm模式,client會發送confirm.select方法幀。通過是否設定了no-wait屬性,來決定Broker端是否會以confirm.select-ok來進行應答。一旦在channel上使用confirm.select方法,channel就將處於Confirm模式。處於 transactional模式的channel不能再被設定成Confirm模式,反之亦然。

這裡與前面的一些文章介紹的一致,釋出確認和事務兩者不可同時引入,channel一旦設定為Confirm模式就不能為事務模式,為事務模式就不能為Confirm模式。

在生產者將通道設定成Confirm模式,一旦通道進入Confirm模式,所有在該通道上面釋出的訊息都會被指派一個唯一的ID(以confirm.select為基礎從1開始計數),一旦訊息被投遞到所有匹配的佇列之後,Broker就會發送一個確認給生產者(包含訊息的唯一ID),這就使得生產者知道訊息已經正確到達目的隊列了,如果訊息和佇列是可持久化的,那麼確認訊息會將訊息寫入磁碟之後發出,Broker回傳給生產者的確認訊息中deliver-tag域包含了確認訊息的序列號,此外Broker也可以設定basic.ack的multiple域,表示到這個序列號之前的所有訊息都已經得到了處理。

Confirm模式最大的好處在於它是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用便可以通過回撥方法來處理該確認訊息,如果RabbitMQ因為自身內部錯誤導致訊息丟失,就會發送一條basic.nack來代替basic.ack的訊息,在這個情形下,basic.nack中各域值的含義與basic.ack中相應各域含義是相同的,同時requeue域的值應該被忽略。通過nack一條或多條訊息, Broker表明自身無法對相應訊息完成處理,並拒絕為這些訊息的處理負責。在這種情況下,client可以選擇將訊息re-publish。

在channel 被設定成Confirm模式之後,所有被publish的後續訊息都將被Confirm(即 ack)或者被nack一次。但是沒有對訊息被Confirm的快慢做任何保證,並且同一條訊息不會既被Confirm又被nack。

開啟confirm模式的方法

生產者通過呼叫channel的confirmSelect方法將channel設定為Confirm模式,如果沒有設定no-wait標誌的話,Broker會返回confirm.select-ok表示同意傳送者將當前channel通道設定為Confirm模式(從目前RabbitMQ最新版本3.6來看,如果呼叫了channel.confirmSelect方法,預設情況下是直接將no-wait設定成false的,也就是預設情況下broker是必須回傳confirm.select-ok的)。

confirm

程式設計模式

對於固定訊息體大小和執行緒數,如果訊息持久化,生產者Confirm(或者採用事務機制),消費者ack那麼對效能有很大的影響.

訊息持久化的優化沒有太好方法,用更好的物理儲存(SAS, SSD, RAID卡)總會帶來改善。生產者confirm這一環節的優化則主要在於客戶端程式的優化之上。歸納起來,客戶端實現生產者confirm有三種程式設計方式:

  1. 普通Confirm模式:每傳送一條訊息後,呼叫waitForConfirms()方法,等待伺服器端Confirm。實際上是一種序列Confirm了,每publish一條訊息之後就等待服務端Confirm,如果服務端返回false或者超時時間內未返回,客戶端進行訊息重傳;
  2. 批量Confirm模式:批量Confirm模式,每傳送一批訊息之後,呼叫waitForConfirms()方法,等待服務端Confirm,這種批量確認的模式極大的提高了Confirm效率,但是如果一旦出現Confirm返回false或者超時的情況,客戶端需要將這一批次的訊息全部重發,這會帶來明顯的重複訊息,如果這種情況頻繁發生的話,效率也會不升反降;
  3. 非同步Confirm模式:提供一個回撥方法,服務端Confirm了一條或者多條訊息後Client端會回撥這個方法。

1、普通Confirm模式

主要程式碼為:

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "條訊息").getBytes());
if (channel.waitForConfirms()) {
   System.out.println("傳送成功");
}else{
  //進行訊息重發
}

普通Confirm模式最簡單,publish一條訊息後,等待伺服器端Confirm,如果服務端返回false或者超時時間內未返回,客戶端就進行訊息重傳。

我們還是結合程式碼來講解,下載原來的程式碼 rabbitmq-demo,然後在sender和receiver中分別新建程式碼ConfirmSender1.java和ConfirmReceiver1.java。

ConfirmSender1.java:

package net.anumbrella.rabbitmq.sender;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 這是java原生類支援RabbitMQ,直接執行該類
 */
public class ConfirmSender1 {

    private final static String QUEUE_NAME = "confirm";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        /**
         * 建立連線連線到RabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();

        // 設定RabbitMQ所在主機ip或者主機名
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        // 建立一個連線
        Connection connection = factory.newConnection();

        // 建立一個頻道
        Channel channel = connection.createChannel();

        // 指定一個佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 傳送的訊息
        String message = "This is a confirm message!";

        channel.confirmSelect();
        final long start = System.currentTimeMillis();
        //傳送持久化訊息
        for (int i = 0; i < 5; i++) {
            //第一個引數是exchangeName(預設情況下代理伺服器端是存在一個""名字的exchange的,
            //因此如果不建立exchange的話我們可以直接將該引數設定成"",如果建立了exchange的話
            //我們需要將該引數設定成建立的exchange的名字),第二個引數是路由鍵
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "條訊息").getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("傳送成功");
            }else{
                // 進行訊息重發
            }
        }
        System.out.println("執行waitForConfirms耗費時間: " + (System.currentTimeMillis() - start) + "ms");
        // 關閉頻道和連線
        channel.close();
        connection.close();
    }
}

我們在程式碼中傳送了5條訊息到Broker端,每條訊息傳送後都會等待確認。

ConfirmReceiver1.java:

package net.anumbrella.rabbitmq.receiver;


import com.rabbitmq.client.*;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

/**
 * 這是java原生類支援RabbitMQ,直接執行該類
 */
public class ConfirmReceiver1 {

    private final static String QUEUE_NAME = "confirm";

    public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        // 開啟連線和建立頻道,與傳送端一樣

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("ConfirmReceiver1 waiting for messages. To exit press CTRL+C");

        // 建立佇列消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");

                String message = new String(body, "UTF-8");

                System.out.println(" ConfirmReceiver1  : " + message);
                System.out.println(" ConfirmReceiver1 Done! at " + time.format(new Date()));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我們開啟WireShak,監聽RabbitMQ訊息的傳送。然後我們直接執行ConfirmSender1.java類,可以不用執行ConfirmReceiver.java,因為我們主要是測試訊息到達Broker端,這主要是涉及到Producer和RabbitMQ的服務端。

在控制檯打印出了資訊:

傳送成功
傳送成功
傳送成功
傳送成功
傳送成功
執行waitForConfirms耗費時間: 181ms

在RabbitMQ管理介面confirm佇列裡,我們可以檢視到我們傳送的5條訊息資料。
message

在WireShark中也可以發現開啟了Confirm模式,以及我們傳送的5條訊息。
WireShark
接著我們啟動ConfirmReceiver.java,可以收到我們傳送的具體訊息:

 ConfirmReceiver1 waiting for messages. To exit press CTRL+C
 ConfirmReceiver1  :  Confirm模式, 第1條訊息
 ConfirmReceiver1 Done! at 2018-08-04 14:58:27:0014
 ConfirmReceiver1  :  Confirm模式, 第2條訊息
 ConfirmReceiver1 Done! at 2018-08-04 14:58:27:0016
 ConfirmReceiver1  :  Confirm模式, 第3條訊息
 ConfirmReceiver1 Done! at 2018-08-04 14:58:27:0016
 ConfirmReceiver1  :  Confirm模式, 第4條訊息
 ConfirmReceiver1 Done! at 2018-08-04 14:58:27:0017
 ConfirmReceiver1  :  Confirm模式, 第5條訊息
 ConfirmReceiver1 Done! at 2018-08-04 14:58:27:0017

2、批量Confirm模式

主要程式碼為:

channel.confirmSelect();
for(int i=0;i<5;i++){
     channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "條訊息").getBytes());
}
if(channel.waitForConfirms()){
    System.out.println("傳送成功");
}else{
    // 進行訊息重發
}

這裡主要更改程式碼為傳送批量訊息後再進行等待伺服器確認,還可以呼叫channel.waitForConfirmsOrDie()方法,該方法會等到最後一條訊息得到確認或者得到nack才會結束,也就是說在waitForConfirmsOrDie處會造成當前程式的阻塞。更改程式碼為批量Confirm模式,執行我們檢視控制檯:

傳送成功
執行waitForConfirms耗費時間: 59ms

在WireShark檢視資訊如下:
批量Confirm

可以發現這裡處理的就是在批量傳送資訊完畢後,再進行ACK確認。同時我們發現這裡只有三個Basic.Ack,這是因為Broker對資訊進行了批量處理。

批量處理

我們可以發現multiple的值為true,這與前面我們講解的一致,true確認所有將比第一個引數指定的 delivery-tag 小的訊息都得到確認。

我們也可以發現執行時間比第一種模式縮短了很多,效率極大提高了。

如果我們要對每條訊息進行監聽處理,可以通過在channel中新增監聽器來實現,

channel.addConfirmListener(new ConfirmListener() {

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("nack: deliveryTag = " + deliveryTag + " multiple: " + multiple);
            }

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("ack: deliveryTag = " + deliveryTag + " multiple: " + multiple);
            }
        });

當收到Broker傳送過來的ack訊息時就會呼叫handleAck方法,收到nack時就會呼叫handleNack方法。

我們可以在控制檯看到資訊,這次呼叫了兩次Basic.Ack方法。

ack: deliveryTag = 4 multiple: true
ack: deliveryTag = 5 multiple: false
傳送成功
執行waitForConfirms耗費時間: 50ms

3、非同步Confirm模式

這裡使用的非同步Confirm模式,也要用到上面提到的監聽,但是這裡需要我們自己去維護實現一個waitForConfirms()方法或waitForConfirmsOrDie(),而waitForConfirms()是同步的,因此我們需要自己去實現維護delivery-tag。

waitForConfirms

我們可以在jar中檢視到原始碼,其實waitForConfirmsOrDie()最終呼叫的也是waitForConfirms()方法,在waitForConfirms()方法內部維護了一個同步塊程式碼,而unconfirmedSet就是儲存delivery-tag標識的。

我們要實現自己非同步呼叫,主要就是為了維護delivery-tag,主要實現程式碼如下:

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
       public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            if (multiple) {
                  confirmSet.headSet(deliveryTag + 1L).clear();
              } else {
                    confirmSet.remove(deliveryTag);
              }
       }
       public void handleNack(long deliveryTag, boolean multiple) throws IOException {
             System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
             if (multiple) {
                 confirmSet.headSet(deliveryTag + 1L).clear();
              } else {
                  confirmSet.remove(deliveryTag);
              }
        }
});
for(int i=0;i<5;i++){
            long nextSeqNo = channel.getNextPublishSeqNo();
     channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "條訊息").getBytes());
            confirmSet.add(nextSeqNo);
}

維持非同步呼叫要求我們不能斷掉連線,具體可以參考程式碼ConfirmSender2.java。

4、關於Spring Boot使用Producer的Confirm模式

在前面RabbitMQ學習(三)——探索交換機(Exchange),結合SpringBoot實戰中就有提及到,主要是通過在Sender中實現RabbitTemplate.ConfirmCallback介面來實現該操作。可以參考rabbitmq-demo中的CallBackSender.java和CheckReceiver.java的實現。

二、消費者(Consumer)的Confirm模式

1、手動確認和自動確認

為了保證訊息從佇列可靠地到達消費者,RabbitMQ提供訊息確認機制(message acknowledgment)。消費者在宣告佇列時,可以指定noAck引數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack訊號後才從記憶體(和磁碟,如果是持久化訊息的話)中移去訊息。否則,RabbitMQ會在佇列中訊息被消費後立即刪除它。

採用訊息確認機制後,只要令noAck=false,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為RabbitMQ會一直持有訊息直到消費者顯式呼叫basicAck為止。

在Consumer中Confirm模式中分為手動確認和自動確認。

手動確認主要並使用以下方法:

basic.ack: 用於肯定確認,multiple引數用於多個訊息確認。
basic.recover:是路由不成功的訊息可以使用recovery重新發送到佇列中。
basic.reject:是接收端告訴伺服器這個訊息我拒絕接收,不處理,可以設定是否放回到佇列中還是丟掉,而且只能一次拒絕一個訊息,官網中有明確說明不能批量拒絕訊息,為解決批量拒絕訊息才有了basicNack。
basic.nack:可以一次拒絕N條訊息,客戶端可以設定basicNack方法的multiple引數為true,伺服器會拒絕指定了delivery_tag的所有未確認的訊息(tag是一個64位的long值,最大值是9223372036854775807)。

肯定的確認只是指導RabbitMQ將一個訊息記錄為已投遞。basic.reject的否定確認具有相同的效果。 兩者的差別在於:肯定的確認假設一個訊息已經成功處理,而對立面則表示投遞沒有被處理,但仍然應該被刪除。

同樣的Consumer中的Confirm模式也具有同時確認多個投遞,通過將確認方法的 multiple “欄位設定為true完成的,實現的意義與Producer的一致。

在自動確認模式下,訊息在傳送後立即被認為是傳送成功。 這種模式可以提高吞吐量(只要消費者能夠跟上),不過會降低投遞和消費者處理的安全性。 這種模式通常被稱為“發後即忘”。 與手動確認模式不同,如果消費者的TCP連線或通道在成功投遞之前關閉,該訊息則會丟失。

使用自動確認模式時需要考慮的另一件事是消費者過載。 手動確認模式通常與有限的通道預取一起使用,限制通道上未完成(“進行中”)傳送的數量。 然而,對於自動確認,根據定義沒有這樣的限制。 因此,消費者可能會被交付速度所壓倒,可能積壓在記憶體中,堆積如山,或者被作業系統終止。 某些客戶端庫將應用TCP反壓(直到未處理的交付積壓下降超過一定的限制時才停止從套接字讀取)。 因此,只建議當消費者可以有效且穩定地處理投遞時才使用自動投遞方式。

主要實現程式碼:

// 手動確認訊息
channel.basicAck(envelope.getDeliveryTag(), false);

// 關閉自動確認
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

2、關於Spring Boot使用Consumer的Confirm模式

請參考rabbitmq-demo中的CallBackSender.java和CheckReceiver.java的實現。

最後上面演示的demo,還是放在github,rabbitmq-demo

參考

相關推薦

RabbitMQ學習()——訊息確認機制(Confirm模式)

在上一篇文章中我們講解了RabbitMQ中的AMQP事務來保證訊息傳送到Broker端,同時我們可以在事務之間傳送多條訊息(即在channel.txSelect()和channel.txCommit()之間傳送多條訊息,通過使用事務來保證它們準確到達Broker

RabbitMQ學習(五)——訊息確認機制(AMQP事務)

在前面的文章中,我們對RabbitMQ的訊息分發機制做了探究,知道RabbitMQ訊息的分發機制,包括公平分發和輪詢分發,如果忘記了可以去複寫一下RabbitMQ學習(四)——訊息分發機制。 我們知道可以通過持久化(交換機、佇列和訊息持久化)來保障我們在伺服器崩

RabbitMQ學習(四)——訊息分發機制

在前面的一篇博文中,我們對RabbitMQ中的交換機有了大致的瞭解,同時結合Spring Boot的例項,讓我們對RabbitMQ的用法有了更清晰的認識。如果忘記了可以去複習一下,RabbitMQ學習(三)——探索交換機(Exchange),結合SpringBo

springboot + rabbitmq 用了訊息確認機制,感覺掉坑裡了

>本文收錄在個人部落格:[www.chengxy-nds.top](http://www.chengxy-nds.top),技術資源共享,一起進步 最近部門號召大夥多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這 T M 就是為了刷`KPI`。不過,話說回來這的確是件好事,與其開

RabbitMQ訊息確認機制(事務+Confirm

概述 在 Rabbitmq 中我們可以通過持久化來解決因為伺服器異常而導致丟失的問題,除此之外我們還會遇到一個問題:生產者將訊息傳送出去之後,訊息到底有沒有正確到達 Rabbit 伺服器呢?如果不錯得數處理,我們是不知道的,(即 Rabbit 伺服器不會反饋任何訊息給生產者),也就是預設的情況下是不知道訊息

RabbitMQ訊息確認機制(事務+Confirm

概述 在使用RabbitMQ的時候,我們可以通過訊息持久化操作來解決因為伺服器的異常奔潰導致的訊息丟失,除此之外我們還會遇到一個問題,當訊息的釋出者在將訊息傳送出去之後,訊息到底有沒有正確到達broker代理伺服器呢?如果不進行特殊配置的話,預設情況下發布操作是不會返回任何

RabbitMqrabbitMq訊息確認機制

  一、提出問題 生產者將訊息傳送出去後,訊息是否到達RabbitMq伺服器呢?預設的情況下,是不知道的 二、引入訊息確認機制 兩種方式:           1.AMQP實現事務機制     &

java rabbitmq ack訊息確認機制

ackage com.example.demo.ConsumerDemo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframewo

(三)rabbitmq訊息確認機制ack

接上一篇文章,在application配置檔案中新增如下配置: ## 訊息手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual 這樣就開啟了訊息手動確認,然後再消費者端程式碼中加上如下程式碼進行業務處理完後的訊息確認刪除訊息

RabbitMQ 訊息確認機制 以及 原理解析

一、場景   當訊息的投送方把訊息投遞出去,卻不知道訊息是否投遞成功了。如果訊息投送方不管的話,勢必對系統的造成可靠性的影響。 可是如果要保證系統的可靠性,訊息投靠方,如何知道訊息是否投放成功了呢? 這個就需要訊息的確認機制,我們來看下rabbitMQ的訊息

RabbitMQ ——與SpringBoot整合並實現訊息確認機制

不囉嗦直接上程式碼 目錄結構如下: pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

RabbitMQ訊息佇列(九):Publisher的訊息確認機制

       在前面的文章中提到了queue和consumer之間的訊息確認機制:通過設定ack。那麼Publisher能不到知道他post的Message有沒有到達queue,甚至更近一步,是否被某個

spring rabbitmq 訊息確認機制和事務支援

確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用)在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。 釋出確認: Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理,哪些可能因為br

RabbitMQ使用場景練習:訊息確認機制(十一)

訊息確認機制RabbitMQ提供了transaction、confirm兩種訊息確認機制。transaction即事務機制,手動提交和回滾;confirm機制提供了Confirmlistener和waitForConfirms兩種方式。confirm機制效率明顯會高於tran

RabbitMQ訊息確認機制AMQP事務

概述 我們在RabbitMQ中可以通過持久化來解決伺服器掛掉而丟失資料問題,但是大家有沒有想過,我的訊息到達了RabbitMQ伺服器了嗎??? 我們是不知道的,導致的問題就是 如果訊息在到達伺服器之前就丟失了,持久化也是不能解決問題的! 那怎麼辦? 我

RabbitMQ 訊息持久化、事務、Publisher的訊息確認機制

RabbitMQ  訊息持久化、事務、Publisher的訊息確認機制 1. 宣告MessageQueue 在RabbitMQ中,無論是生產者傳送訊息還是消費者接受訊息,都首先需要宣告一個MessageQueue。 這就存在一個問題,是生產者宣告還是消費者宣告呢?要解決這個

RabbitMQ ACK 訊息確認機制

注意如果拋異常或unack(並且requeue為true),訊息會一直重新入佇列,一不小心就會xxxxx一大堆訊息不斷重複~。 //訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息 (正常消費) channel.basicAck(message.getMess

RabbitMQ (九) : 消息確認機制confirm 機制

sel 異步 confirm lin creat 生產 不能 消息 tip confirm 機制分串行和並行兩種. 串行 生產者 public class Producer { private const string QueueName

RabbitMQ實戰-訊息確認機制訊息的正確消費

上節中我們講了如何確保訊息的準確釋出,今天我們來看看如何確保訊息的正確消費。 在之前的基礎上我們對消費者(倉庫服務)進行完善。 修改配置檔案application.yml 消費者的ack方式預設是自動的,也就是說訊息一旦被消費(無論是否處理成功),訊息都會被確認,然後會從

九、rabbitMQ訊息確認機制之事務機制

說明:在rabbitMQ中,我們為了解決伺服器異常導致資料丟失的問題,我們可以採用rabbitMQ的持久化機制,但是我們如何確定生產者將訊息傳送給了rabbitMQ呢,那麼我們採用兩種協議的模式。    (1)、AMQP實現了事務機制    (2)、confirm模式一、事務