RabbitMQ防止訊息丟失
1.簡介
RabbitMQ
中,訊息丟失可以簡單的分為兩種:客戶端丟失和服務端丟失。針對這兩種訊息丟失,RabbitMQ
都給出了相應的解決方案。
2.防止客戶端丟失訊息
如圖,生產者P向佇列中生產訊息,C1和C2消費佇列中的訊息,預設情況下,RabbitMQ
會平均的分發消費給C1C2(Round-robin dispatching
),假設一個任務的執行時間非常長,在執行過程中,客戶端掛了(連線斷開),那麼,該客戶端正在處理且未完成的訊息,以及分配給它還沒來得及執行的訊息,都將丟失。因為預設情況下,RabbitMQ
分發完訊息後,就會從記憶體中把訊息刪除掉。
3.訊息確認(Message acknowledgment
)
為了解決上述問題,RabbitMQ
引入了訊息確認機制,當訊息處理完成後,給Server
端傳送一個確認訊息,來告訴服務端可以刪除該訊息了,如果連線斷開的時候,Server
端沒有收到消費者發出的確認資訊,則會把訊息轉發給其他保持線上的消費者。
驗證上述問題
首先,我們驗證上述問題(客戶端丟失訊息)是否真的存在,對Consumer
進行如下改造。
先生產兩條訊息
啟動消費者,在消費者接收到訊息,還沒處理完成的時候,強制關掉
這時,觀察控制檯,發現兩條訊息都沒有了,1條是在執行中丟失的,還有1條,已經分配給這個Consumer
,還沒來得及處理,也丟失了
這證明了上述問題是真的存在的,如果發生在生產環境,將產生難以預料的後果
引入訊息確認機制
為了方便觀察,我們用CMD
來執行Consumer
,要通過maven
打成可執行的JAR
包,需要在pom.xml
中增加如下配置
<build>
<finalName>Consumer</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration >
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.liyang.ticktock.rabbitmq.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
上述配置描述了最終打包名字、入口類路徑、帶上依賴包、使用1.8
版本的JDK
進行打包,配置完後,就可以通過maven
的install
方法,在target
目錄生成可執行的jar
包,如果包大小很小,應檢查配置,是不是沒有帶上依賴包
再次改造Consummer
類
install
成可執行jar
包,通過cmd
開啟兩個consumer
通過Sender
傳送一條訊息,然後用Ctrl+C
結束先收到訊息的Consumer
,發現另外一個Consumer
接收到了未處理完的訊息
問題得到了解決,現在消費者在執行過程中死掉也不會丟失訊息了
看一下發送確認的方法
/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received 這個是RabbitMQ用來區分訊息的,文件在這 8 * @param multiple true to acknowledge all messages up to and 為true的話,確認所有訊息,為false只確認當前訊息
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
在官方文件中,這樣描述deliveryTag
簡單來說,就是RabbitMQ
內部用來區分訊息的一個標籤,從envelope
中獲取就行了
忘記確認將引起記憶體洩漏
RabbitMQ
只有在收到消費者確認後,才會從記憶體中刪除訊息,如果消費者忘了確認(更多情況是因為程式碼問題沒有執行到確認的程式碼),將會導致記憶體洩漏
驗證一下
註釋掉Consumer
中的確認程式碼
執行Sender
和Consumer
,不停的生產消費訊息,發現消費者在正常的消費訊息
檢視控制檯,發現已經被吃掉了43KB的記憶體,所以,在試用過程中,一定要保證訊息確認在任何情況下都可以發出,否則即使消費者處理完成,RabbitMQ
也不會把訊息在記憶體中清除,在該消費者斷開連線之後,還會把訊息轉發給其他消費者重新處理,將引發難以預計的問題
4.訊息的持久化
現在,消費者宕機已經無法影響到我們的訊息了,但如果RabbitMQ
重啟了,訊息依然會丟失。所幸的是,RabbitMQ
提供了持久化的機制,將記憶體中的訊息持久化到硬碟上,即使重啟RabbitMQ
,訊息也不會丟失。但是,仍然有一個非常短暫的時間視窗(RabbitMQ
收到訊息還沒來得及存到硬碟上)會導致訊息丟失,如果需要嚴格的控制,可以參考官方文件
要使用RabbitMQ
的訊息持久化,在宣告佇列時設定一個引數即可
注意,RabbitMQ
不允許對一個已經存在的佇列用不同的引數重新宣告,對於試圖這麼做的程式,會報錯,所以,改動之前程式碼之前,要在控制檯中把原來的佇列刪除
重新宣告佇列後,發現Durable
為true
重啟RabbitMQ
佇列的訊息沒有丟失
5.結束語
這一章介紹了RabbitMQ
訊息的確認和持久化,後面將會繼續深入介紹RabbitMQ
的其他特性