1. 程式人生 > 實用技巧 >RabbitMQ知識點整理12-消費端的確認與拒絕

RabbitMQ知識點整理12-消費端的確認與拒絕

為了保證訊息從佇列可靠地達到消費者, RabbitMQ 提供了訊息確認機制( message acknowledgement), 消費者在訂閱佇列時,可以指定autoAck引數, ,當autoAck 等於false時,RabbitMQ會等待消費者顯式地回覆確認訊號後才從記憶體(或者磁碟)中移去訊息(實質上是先打上刪除標記,之後再刪除) 。當autoAck 等於true時,RabbitMQ 會自動把傳送出去的訊息置為確認, 然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正地消費到了這些訊息。

採用訊息確認機制後,只要設定autoAck 引數為false ,消費者就有足夠的時間處理訊息(任務) ,不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為RabbitMQ 會一直等待持有訊息直到消費者顯式呼叫Basic.Ack 命令為止。

當autoAck 引數置為false ,對於RabbitMQ 服務端而言,佇列中的訊息分成了兩個部分:一部分是等待投遞給消費者的訊息:一部分是己經投遞給消費者,但是還沒有收到消費者確認訊號的訊息。如果RabbitMQ 一直沒有收到消費者的確認訊號,並且消費此訊息的消費者己經斷開連線, 則RabbitMQ 會安排該訊息重新進入佇列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。

RabbitMQ 不會為未確認的訊息設定過期時間,它判斷此訊息是否需要重新投遞給消費者的唯一依據是消費該訊息的消費者連線是否己經斷開, 這麼設計的原因是RabbitMQ 允許消費者消費一條訊息的時間可以很久很久。

RabbtiMQ 的Web 管理平臺(15672埠)上可以看到當前佇列中的" Ready" 狀態和"Unacknowledged" 狀態的訊息數,分別對應上文中的等待投遞給消費者的訊息數和己經投遞給消費者但是未收到確認訊號的訊息數

也可以通過相應的命令來檢視上述資訊:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

在消費者接收到訊息後,如果想明確拒絕當前的訊息而不是確認,那麼應該怎麼做呢?

RabbitMQ 在2 .0.0 版本開始引入了Basic.Reject 這個命令,消費者客戶端可以呼叫與其對應的channel.basicReject 方法來告訴RabbitMQ 拒絕這個訊息。

Channel 類中的basicReject 方法定義如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中deliveryTag 可以看作訊息的編號,它是一個64 位的長整型值,最大值是9223372036854775807,如果requeue 引數設定為true ,則RabbitMQ 會重新將這條訊息存入佇列,以便可以傳送給下一個訂閱的消費者,如果requeue 引數設定為false ,則RabbitMQ立即會把訊息從佇列中移除,而不會把它傳送給新的消費者。

Basic.Reject 命令一次只能拒絕一條訊息,如果想要批量拒絕訊息,則可以使用Basic.Nack 這個命令,消費者客戶端可以呼叫channel.basicNack 方法來實現,方法定義如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中deliveryTag 和requeue 的含義可以參考basicReject 方法。multiple 引數設定為false 則表示拒絕編號為deliveryTag的這一條訊息,這時候basicNack 和basicReject 方法一樣;multiple 引數設定為true 則表示拒絕deliveryTag 編號之前所有未被當前消費者確認的訊息。

注意要點:

將channel.basicReject 或者channel.basicNack 中的requeue 設定為false ,可以啟用"死信佇列"的功能。死信佇列可以通過檢測被拒絕或者未送達的訊息來追蹤問題。詳細內容後面再說^_^

對於requeue , AMQP 中還有一個命令Basic.Recover 具備可重入佇列的特性。其對應的客戶端方法為:

1.Basic.RecoverOk basicRecover() throws IOException;
2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

這個channel.basicRecover 方法用來請求RabbitMQ 重新發送還未被確認的訊息。如果requeue 引數設定為true,則未被確認的訊息會被重新加入到佇列中,這樣對於同一條訊息來說,可能會被分配給與之前不同的消費者。如果requeue 引數設定為false ,那麼同一條訊息會被分配給與之前相同的消費者, 預設情況下,如果不設定requeue 這個引數,相當於channel.basicRecover(true) ,即requeue 預設為true