七、消費訊息與效能權衡(讀數筆記與個人實踐)
摘要
主要介紹消費訊息時的幾種方式:
- 平衡訊息消費的可靠性與效能;
- 死信交換器;
- 設定自動刪除佇列、持久化佇列、TTL等;
消費效能
新增一個圖
使用no-ack模式
在消費訊息時,負責消費的應用程式會發送一個Basic.Consumer請求,與該請求一起傳送的還有一個no-ack標誌。當這個標誌啟用時,它會告訴RabbitMQ消費者在接收到訊息時不會進行確認,RabbitMQ只管儘快的傳送訊息。
使用no-ack標誌消費訊息是讓RabbitMQ將消費投遞給消費者的最快方式,但這也是最不可靠的方式。
如果使用no-ack,那麼當有新的可用訊息時,RabbitMQ將會發送該訊息給消費者,而不用等待。實際上,如果有可用訊息,RabbitMQ會持續向消費者傳送它們,直到套接字緩衝區被填滿為止。
目前沒有找到RabbitTemplate如何開啟no-ack的方法,如果有用過的朋友,請留言告訴我,謝謝。
訊息確認模式
訊息確認模式要求每次消費訊息時,向RabbitMQ返回一個Basic.Ack,告知RabbitMQ訊息已經成功消費,可以在伺服器中刪除該訊息。
訊息確認有三種確認方式:
- Ack;
- Reject;
- Nack;
基於RabbitTemplate,下面這段程式碼,有對這幾種確認方式的實現,在配置檔案中開啟手動確認模式,acknowledge-mode屬性為manual(預設為自動確認):
spring: #訊息佇列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / connection-timeout: 5000ms listener: simple: acknowledge-mode: manual
/** * 消費者監聽訊息佇列 */ @Component @Slf4j @RabbitListener(queues = "DIRECT_QUEUE") public class DirectQueueListener { @RabbitHandler public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException { log.info("消費訊息成功: {}", message); Thread.sleep(1000); switch (message) { case "nack": channel.basicNack(tag, true, false); // 第二個引數控制是否開啟批量拒絕,第三個引數表示是否requeue break; case "nack-requeue": channel.basicNack(tag, true, true); break; case "reject": channel.basicReject(tag, false); break; case "reject-requeue": // 啟用了requeue,如果只有一個消費者,容易造成死迴圈 channel.basicReject(tag, true); break; default: channel.basicAck(tag, true); break; } } }
channel.basicAck:當正常消費訊息時,呼叫該方法。
我們看到除了basicAck,還有basicReject和basicNack。這兩種,顧名思義,是用來拒絕消費的。
channel.basicReject:從協議層面上,reject是傳送一個Basic.Reject響應,告知RabbitMQ無法對這條訊息進行處理,當拒絕時,可以指定是否丟棄訊息或使用requeue標誌重新發送訊息。當啟用requeue時,RabbitMQ將會把這條訊息重新放回到佇列中。
不能使用basicReject一次拒絕多個訊息。
channel.basicNack:Basic.Nack實現與Basic.Reject相同的行為,但添加了批量拒絕的功能。
設定multiple或requeue如圖所示:
服務質量確認模式
AMQP規範要求通道要有服務質量設定,即在確認訊息接收之前可以預先接收一定數量的訊息。可以設定一個預取數量來實現高效的傳送訊息。
如果消費者應用程式在確認訊息之前崩潰,在套接字關閉時,所有預取的訊息將返回到佇列。
如果設定了no-ack,那麼預取大小將被忽略。
使用RabbitTemplate時,可以在消費者應用程式的配置檔案中配置預取大小:
spring:
#訊息佇列配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 5000ms
listener:
simple:
acknowledge-mode: manual
prefetch: 1000
其中prefetch就是預取大小,消費者應用程式執行後,可以在RabbitMQ的控制檯看到這個設定:
如果熟悉抓包軟體的朋友,可以試著抓包看看:
我預先發送了2條訊息到RabbitMQ,可以看到上圖中最後兩行是兩個Ack。
有一種方式可以一次確認多個訊息,Basic.Ack響應具有一個multiple屬性,當把它設定為true時就能確認以前未確認的訊息。
如果使用multiple,當成功的接收了一些訊息,並且應用程式在回覆Ack之前就發生了異常,則所有為確認的訊息將返回佇列。
死信交換器
RabbitMQ的死信交換器是一種可以拒絕已投遞訊息的可選行為,一般有三種情況的訊息會進入死信佇列:
- 當拒絕了一個不重新發送的訊息時,會進入死信;
- 當訊息的TTL到期時,會進入死信;
- 當佇列已滿時,會進入死信;
死信與備用交換器不同,過期或被拒絕的訊息通過死信交換器進行投遞,而備用交換器則路由那些無法由RabbitMQ路由的訊息。
在RabbitMQ中,在宣告佇列時,指定死信交換器:
/**
* 宣告佇列。
* 同時指定死信佇列。
*
* @return Queue物件。
*/
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable("DIRECT_QUEUE")
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
.build();
}
死信交換器還允許使用預先指定的值覆蓋路由鍵,這樣可以允許同一個交換器同時處理死信和非死信訊息,但需要確保死信訊息不被投遞到相同的佇列。設定預定義路由鍵的關鍵字是:x-dead-letter-routing-key。
測試死信佇列,當消費者拒絕時,檢視訊息是否會進入死信佇列:
圖。
控制佇列
定義佇列時,有多個設定可以確定佇列的行為:
- 自動刪除自己;
- 只允許一個消費者進行消費;
- 自動過期訊息;
- 保持有限數量的訊息;
- 將舊訊息推出堆疊;
更改佇列的設定,必須刪除佇列並重新建立它。
臨時佇列
也可以叫做自動刪除的佇列。
一旦消費者完成連線和檢索訊息,在斷開連線時佇列將被刪除。
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable("DIRECT_QUEUE").autoDelete()
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
.build();
}
佇列只會在沒有消費者監聽的時候自行刪除。
只允許單個消費者
在需要確保只有單個消費者能夠消費佇列中的訊息時,在建立佇列時設定exclusive屬性,啟用後在消費者斷開連線後,佇列也會自動刪除。
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable("DIRECT_QUEUE").exclusive()
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
.build();
}
宣告exclusive的佇列,只能被宣告時所指定的同一個連線和通道所消費,當建立佇列的通道關閉時,獨佔佇列也將會刪除。在通道關閉之間,可以多次宣告和取消exclusive佇列的消費者。
自動過期佇列
如果一段時間沒有使用該佇列,就刪除它。
建立一個自動過期的佇列非常簡單,要做的事情就是使用x-expires引數宣告一個佇列。該引數以毫秒為單位設定佇列的生存時間(Time To Live,TTL)。
自動過期佇列有一些嚴格的約定:
- 佇列只有在沒有消費者的情況下才會過期。如果有連線消費者,則只有發出Basic.Cancel或斷開連線之後才自動刪除;
- 佇列只有在TTL週期之內沒有收到Basic.Get請求時才會到期。一旦一個Basic.Get請求中已經包含了一個具有過期值的佇列,那麼過期設定無效,佇列不會被自動刪除(不要使用Get);
- 不能重新宣告或更改x-expires屬性;
- RabbitMQ不保證過期刪除佇列這一過程的時效性;
永久佇列
使用durable標誌告訴RabbitMQ希望佇列配置被儲存在伺服器:
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable("DIRECT_QUEUE")
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
.build();
}
訊息級別的TTL
訊息級別的TTL設定允許伺服器對訊息的最大生存時間進行限制。宣告佇列的同時指定死信交換器和TTL值將導致該佇列中已到期的訊息成為死信訊息。
可以使用x-message-ttl設定佇列的訊息TTL時間。
最大長度佇列
從RabbitMQ3.1.0開始,可以在宣告佇列時指定最大長度。如果在佇列上設定列x-max-length引數,一旦訊息到達最大值,RabbitMQ會在新增新訊息時刪除位於佇列前端的訊息,如果宣告佇列時候,指定列死信交換器,則從佇列前端刪除的訊息會