RabbitMQ之訊息持久化(佇列持久化、訊息持久化)
訊息的可靠性是RabbitMQ的一大特色,那麼RabbitMQ是如何保證訊息可靠性的呢——訊息持久化。
為了保證RabbitMQ在退出或者crash等異常情況下資料沒有丟失,需要將queue,exchange和Message都持久化。
queue的持久化
queue的持久化是通過durable=true來實現的。
一般程式中這麼使用:
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name" , true, false, false, null);
- 1
- 2
- 3
關鍵的是第二個引數設定為true,即durable=true.
Channel類中queueDeclare的完整定義如下:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
引數說明:
queue:queue的名稱
exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次申明它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:1. 排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一連線建立的排他佇列;2.“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同;3.即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的,這種佇列適用於一個客戶端傳送讀取訊息的應用場景。
autoDelete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。
queueDeclare相關的有4種方法,分別是:
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
其中需要說明的是queueDeclarePassive(String queue)可以用來檢測一個queue是否已經存在。如果該佇列存在,則會返回true;如果不存在,就會返回異常,但是不會建立新的佇列。
訊息的持久化
如過將queue的持久化標識durable設定為true,則代表是一個持久的佇列,那麼在服務重啟之後,也會存在,因為服務會把持久化的queue存放在硬碟上,當服務重啟的時候,會重新什麼之前被持久化的queue。佇列是可以被持久化,但是裡面的訊息是否為持久化那還要看訊息的持久化設定。也就是說,重啟之前那個queue裡面還沒有發出去的訊息的話,重啟之後那佇列裡面是不是還存在原來的訊息,這個就要取決於發生著在傳送訊息時對訊息的設定了。
如果要在重啟後保持訊息的持久化必須設定訊息是持久化的標識。
設定訊息的持久化:
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
- 1
這裡的關鍵是:MessageProperties.PERSISTENT_TEXT_PLAIN
首先看一下basicPublish的方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
- 1
- 2
- 3
- 4
- 5
exchange表示exchange的名稱
routingKey表示routingKey的名稱
body代表傳送的訊息體
有關mandatory和immediate的詳細解釋可以參考:RabbitMQ之mandatory和immediate.
這裡關鍵的是BasicProperties props這個引數了,這裡看下BasicProperties的定義:
public BasicProperties(
String contentType,//訊息型別如:text/plain
String contentEncoding,//編碼
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//優先順序
String correlationId,
String replyTo,//反饋佇列
String expiration,//expiration到期時間
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
這裡的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。
上面的實現程式碼使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那麼這個又是什麼呢?
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
可以看到這其實就是講deliveryMode設定為2的BasicProperties的物件,為了方便程式設計而出現的一個東東。
換一種實現方式:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
- 1
- 2
- 3
- 4
設定了佇列和訊息的持久化之後,當broker服務重啟的之後,訊息依舊存在。單隻設定佇列持久化,重啟之後訊息會丟失;單隻設定訊息的持久化,重啟之後佇列消失,既而訊息也丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。
exchange的持久化
上面闡述了佇列的持久化和訊息的持久化,如果不設定exchange的持久化對訊息的可靠性來說沒有什麼影響,但是同樣如果exchange不設定持久化,那麼當broker服務重啟之後,exchange將不復存在,那麼既而傳送方rabbitmq producer就無法正常傳送訊息。這裡博主建議,同樣設定exchange的持久化。exchange的持久化設定也特別簡單,方法如下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在宣告的時候講durable欄位設定為true即可。
進一步討論
1.將queue,exchange, message等都設定了持久化之後就能保證100%保證資料不丟失了嚒?
答案是否定的。
首先,從consumer端來說,如果這時autoAck=true,那麼當consumer接收到相關訊息之後,還沒來得及處理就crash掉了,那麼這樣也算資料丟失,這種情況也好處理,只需將autoAck設定為true(方法定義如下),然後在正確處理完訊息之後進行手動ack(channel.basicAck).
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- 1
其次,關鍵的問題是訊息在正確存入RabbitMQ之後,還需要有一段時間(這個時間很短,但不可忽視)才能存入磁碟之中,RabbitMQ並不是為每條訊息都做fsync的處理,可能僅僅儲存到cache中而不是物理磁碟上,在這段時間內RabbitMQ broker發生crash, 訊息儲存到cache但是還沒來得及落盤,那麼這些訊息將會丟失。那麼這個怎麼解決呢?首先可以引入RabbitMQ的mirrored-queue即映象佇列,這個相當於配置了副本,當master在此特殊時間內crash掉,可以自動切換到slave,這樣有效的保障了HA, 除非整個叢集都掛掉,這樣也不能完全的100%保障RabbitMQ不丟訊息,但比沒有mirrored-queue的要好很多,很多現實生產環境下都是配置了mirrored-queue的。還有要在producer引入事務機制或者Confirm機制來確保訊息已經正確的傳送至broker端,有關RabbitMQ的事務機制或者Confirm機制可以參考:RabbitMQ之訊息確認機制(事務+Confirm). 幸虧本文的主題是討論RabbitMQ的持久化而不是可靠性,不然就一發不可收拾了。RabbitMQ的可靠性涉及producer端的確認機制、broker端的映象佇列的配置以及consumer端的確認機制,要想確保訊息的可靠性越高,那麼效能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取捨。
2.訊息什麼時候刷到磁碟?
寫入檔案前會有一個Buffer,大小為1M,資料在寫入檔案時,首先會寫入到這個Buffer,如果Buffer已滿,則會將Buffer寫入到檔案(未必刷到磁碟)。
有個固定的刷盤時間:25ms,也就是不管Buffer滿不滿,每個25ms,Buffer裡的資料及未重新整理到磁碟的檔案內容必定會刷到磁碟。
每次訊息寫入後,如果沒有後續寫入請求,則會直接將已寫入的訊息刷到磁碟:使用Erlang的receive x after 0實現,只要程序的信箱裡沒有訊息,則產生一個timeout訊息,而timeout會觸發刷盤操作。
欲瞭解更多訊息中介軟體的內容,可以關注:訊息中介軟體收錄集