1. 程式人生 > >RabbitMQ防止訊息丟失

RabbitMQ防止訊息丟失

1.簡介

RabbitMQ中,訊息丟失可以簡單的分為兩種:客戶端丟失和服務端丟失。針對這兩種訊息丟失,RabbitMQ都給出了相應的解決方案。

2.防止客戶端丟失訊息

如圖,生產者P向佇列中生產訊息,C1和C2消費佇列中的訊息,預設情況下,RabbitMQ會平均的分發消費給C1C2(Round-robin dispatching),假設一個任務的執行時間非常長,在執行過程中,客戶端掛了(連線斷開),那麼,該客戶端正在處理且未完成的訊息,以及分配給它還沒來得及執行的訊息,都將丟失。因為預設情況下,RabbitMQ分發完訊息後,就會從記憶體中把訊息刪除掉。

3.訊息確認(Message acknowledgment

為了解決上述問題,RabbitMQ引入了訊息確認機制,當訊息處理完成後,給Server端傳送一個確認訊息,來告訴服務端可以刪除該訊息了,如果連線斷開的時候,Server端沒有收到消費者發出的確認資訊,則會把訊息轉發給其他保持線上的消費者。

驗證上述問題

首先,我們驗證上述問題(客戶端丟失訊息)是否真的存在,對Consumer進行如下改造。

image

先生產兩條訊息

image

啟動消費者,在消費者接收到訊息,還沒處理完成的時候,強制關掉

image

這時,觀察控制檯,發現兩條訊息都沒有了,1條是在執行中丟失的,還有1條,已經分配給這個Consumer,還沒來得及處理,也丟失了

image

這證明了上述問題是真的存在的,如果發生在生產環境,將產生難以預料的後果

引入訊息確認機制

為了方便觀察,我們用CMD來執行Consumer,要通過maven打成可執行的JAR包,需要在pom.xml中增加如下配置

<build>
        <finalName>Consumer</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration
>
<appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.liyang.ticktock.rabbitmq.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>

上述配置描述了最終打包名字、入口類路徑、帶上依賴包、使用1.8版本的JDK進行打包,配置完後,就可以通過maveninstall方法,在target目錄生成可執行的jar包,如果包大小很小,應檢查配置,是不是沒有帶上依賴包

image

再次改造Consummer

image

install成可執行jar包,通過cmd開啟兩個consumer

image

通過Sender傳送一條訊息,然後用Ctrl+C結束先收到訊息的Consumer,發現另外一個Consumer接收到了未處理完的訊息

image

問題得到了解決,現在消費者在執行過程中死掉也不會丟失訊息了

看一下發送確認的方法

/**
  * Acknowledge one or several received
  * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
 * containing the received message being acknowledged.
 * @see com.rabbitmq.client.AMQP.Basic.Ack
 * @param deliveryTag the tag from the received 這個是RabbitMQ用來區分訊息的,文件在這 8      * @param multiple true to acknowledge all messages up to andtrue的話,確認所有訊息,為false只確認當前訊息
  * including the supplied delivery tag; false to acknowledge just
 * the supplied delivery tag.
 * @throws java.io.IOException if an error is encountered
 */
 void basicAck(long deliveryTag, boolean multiple) throws IOException;

在官方文件中,這樣描述deliveryTag

image

簡單來說,就是RabbitMQ內部用來區分訊息的一個標籤,從envelope中獲取就行了

忘記確認將引起記憶體洩漏

RabbitMQ只有在收到消費者確認後,才會從記憶體中刪除訊息,如果消費者忘了確認(更多情況是因為程式碼問題沒有執行到確認的程式碼),將會導致記憶體洩漏

驗證一下

註釋掉Consumer中的確認程式碼

image

image

執行SenderConsumer,不停的生產消費訊息,發現消費者在正常的消費訊息

image

檢視控制檯,發現已經被吃掉了43KB的記憶體,所以,在試用過程中,一定要保證訊息確認在任何情況下都可以發出,否則即使消費者處理完成,RabbitMQ也不會把訊息在記憶體中清除,在該消費者斷開連線之後,還會把訊息轉發給其他消費者重新處理,將引發難以預計的問題

image

image

4.訊息的持久化

現在,消費者宕機已經無法影響到我們的訊息了,但如果RabbitMQ重啟了,訊息依然會丟失。所幸的是,RabbitMQ提供了持久化的機制,將記憶體中的訊息持久化到硬碟上,即使重啟RabbitMQ,訊息也不會丟失。但是,仍然有一個非常短暫的時間視窗(RabbitMQ收到訊息還沒來得及存到硬碟上)會導致訊息丟失,如果需要嚴格的控制,可以參考官方文件

要使用RabbitMQ的訊息持久化,在宣告佇列時設定一個引數即可

image

注意,RabbitMQ不允許對一個已經存在的佇列用不同的引數重新宣告,對於試圖這麼做的程式,會報錯,所以,改動之前程式碼之前,要在控制檯中把原來的佇列刪除

image

重新宣告佇列後,發現Durabletrue

image

重啟RabbitMQ
image

佇列的訊息沒有丟失

image

5.結束語

這一章介紹了RabbitMQ訊息的確認和持久化,後面將會繼續深入介紹RabbitMQ的其他特性