【RabbitMQ 實戰指南】一 延遲佇列
阿新 • • 發佈:2019-10-29
1、什麼是延遲佇列
延遲佇列中儲存延遲訊息,延遲訊息是指當訊息被髮送到佇列中不會立即消費,而是等待一段時間後再消費該訊息。
延遲佇列很多應用場景,一個典型的應用場景是訂單未支付超時取消,使用者下單之後30分鐘內未支付成功,則把訂單取消。
2、使用要求
RabbitMQ 本身沒有直接支援延遲佇列的功能,但是可以通過過期時間TTL和死信佇列來模擬延遲佇列。
過期時間TTL 可以參考文章: 【RabbitMQ 實戰指南】一 過期時間TTL
死信佇列可以參考文章:【RabbitMQ 實戰指南】一 死信佇列
3、延遲佇列測試
採用訂單未支付超時取消的應用場景來做測試,其具體步驟如下:
-
1、建立兩個交換器 exchange.order 和 exchange.delay, 分別繫結兩個佇列 queue.order 和 queue.delay
-
2、把 queue.delay 佇列裡面的訊息配置過期時間,一般訂單是30分鐘,這裡設定成10秒,然後通過 x-dead-letter-exchange 指定死信交換器為 exchange.delay
-
3、傳送訊息到 queue.order 中,訊息過期之後流入 exchange.delay,然後路由到 queue.delay 佇列中,然後檢查訂單狀態,如果未支付,則進行取消操作
3.1、生產者程式碼
<?php require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置 $connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true); $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true); $args = new AMQPTable(); // 訊息過期方式:設定 queue.order 佇列中的訊息10s之後過期 $args->set('x-message-ttl', 10000); $args->set('x-dead-letter-exchange', 'exchange.delay'); $args->set('x-dead-letter-routing-key', 'routingkey.delay'); $channel->queue_declare('queue.order', false, true, false, false, false, $args); $channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order'); $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay'); $message = new AMQPMessage('F20190413180108970');
$channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order'); $channel->close(); $connection->close();
執行生產者程式碼之後,queue.order 佇列會有一條訊息,如下圖:
10秒之後,訊息會過期,然後被進入 exchange.delay, 進而路由到 queue.delay 佇列中:
3.2、消費者程式碼
<?php require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置 $connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true); $channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay'); function process_message($message) { echo "開始處理訂單,訂單號:" . $message->body . PHP_EOL; echo "獲取訂單的狀態,如果未支付,則進行取消訂單操作" . PHP_EOL; $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } $channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message'); function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); while ($channel ->is_consuming()) { $channel->wait(); }
執行消費者程式碼之後,會獲取到訂單號,之後可以檢查該訂單的狀態,如果未支付則進行取消操作,如下圖:
&n