1. 程式人生 > >【RabbitMQ 實戰指南】一 延遲佇列

【RabbitMQ 實戰指南】一 延遲佇列

1、什麼是延遲佇列

延遲佇列中儲存延遲訊息,延遲訊息是指當訊息被髮送到佇列中不會立即消費,而是等待一段時間後再消費該訊息。

延遲佇列很多應用場景,一個典型的應用場景是訂單未支付超時取消,使用者下單之後30分鐘內未支付成功,則把訂單取消。

2、使用要求

RabbitMQ 本身沒有直接支援延遲佇列的功能,但是可以通過過期時間TTL和死信佇列來模擬延遲佇列。

過期時間TTL 可以參考文章: 【RabbitMQ 實戰指南】一 過期時間TTL

死信佇列可以參考文章:【RabbitMQ 實戰指南】一 死信佇列

3、延遲佇列測試

採用訂單未支付超時取消的應用場景來做測試,其具體步驟如下:

  • 1、建立兩個交換器 exchange.order 和 exchange.delay, 分別繫結兩個佇列 queue.order 和 queue.delay

  • 2、把 queue.delay 佇列裡面的訊息配置過期時間,一般訂單是30分鐘,這裡設定成10秒,然後通過 x-dead-letter-exchange 指定死信交換器為 exchange.delay

  • 3、傳送訊息到 queue.order 中,訊息過期之後流入 exchange.delay,然後路由到 queue.delay 佇列中,然後檢查訂單狀態,如果未支付,則進行取消操作

3.1、生產者程式碼

<?php 
require __DIR__ . '/../../../../vendor/autoload.php';

use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');

$channel = $connection->channel();

$channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$args = new AMQPTable();
// 訊息過期方式:設定 queue.order 佇列中的訊息10s之後過期
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-exchange', 'exchange.delay');
$args->set('x-dead-letter-routing-key', 'routingkey.delay');
$channel->queue_declare('queue.order', false, true, false, false, false, $args);
$channel->queue_declare('queue.delay', false, true, false, false);

$channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order');
$channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');
$message = new AMQPMessage('F20190413180108970');
$channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order');

$channel->close();
$connection->close();

執行生產者程式碼之後,queue.order 佇列會有一條訊息,如下圖:

10秒之後,訊息會過期,然後被進入 exchange.delay, 進而路由到 queue.delay 佇列中:

3.2、消費者程式碼

<?php 
require __DIR__ . '/../../../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel();

$channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$channel->queue_declare('queue.delay', false, true, false, false);

$channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');

function process_message($message)
{
    echo "開始處理訂單,訂單號:" . $message->body . PHP_EOL;
    echo "獲取訂單的狀態,如果未支付,則進行取消訂單操作" . PHP_EOL;
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}

$channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message');

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

while ($channel ->is_consuming()) {
    $channel->wait();
}

執行消費者程式碼之後,會獲取到訂單號,之後可以檢查該訂單的狀態,如果未支付則進行取消操作,如下圖:

&n