RabbitMQ使用延遲佇列實現一次性定時任務(php版)
阿新 • • 發佈:2019-01-13
本文建立在讀者對RabbitMQ的基礎瞭解上
本文延遲佇列實現參照 https://blog.csdn.net/u012119576/article/details/74677835
對相關概念的理解參照 https://blog.csdn.net/samxx8/article/details/47417133
作為phper在實現諸如“課程開啟後十分鐘推送訊息”,"訂單生成後多少分鐘自動取消"這類問題上會有一些問題,目前我能想到的三種解決方案有:
2.exec設定linux系統一次性定時器 at命令(atd包) ,windows伺服器上好像直接可以設定,但是最低間隔是一分鐘還是30s,具體忘了。。。
3.crontab設定定時器檢查(不建議。。。)
最近在弄RabbitMQ時,發現可以使用延遲佇列實現這類需求。具體原理是新建兩條佇列繫結對應的交換機,其中一條設定訊息延遲執行,在到期後使用交換機丟到與客戶端連線的佇列中,傳送給客戶端,具體參見程式碼。
send.php
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); //給cache傳送 使其過期然後定向到另一個 //宣告兩個佇列 $channel->exchange_declare('delay_exchange', 'direct',false,false,false); $channel->exchange_declare('cache_exchange', 'direct',false,false,false); $tale = new AMQPTable(); $tale->set('x-dead-letter-exchange', 'delay_exchange');//****很關鍵 表示過期後由哪個exchange處理 $tale->set('x-dead-letter-routing-key','delay_exchange');//****很關鍵 表示過期後由哪個exchange處理 //$tale->set('x-message-ttl',15000); //存活時長 下面的過期時間不能超過 $channel->queue_declare('cache_queue',false,true,false,false,false,$tale); $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange'); $channel->queue_declare('delay_queue',false,true,false,false,false); $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange'); $msg = new AMQPMessage('Hello World'.'3000',array( 'expiration' => intval(18000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT )); $channel->basic_publish($msg,'cache_exchange','cache_exchange'); echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL; $channel->close(); $connection->close();
reciever.php
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_exchange', 'direct',false,false,false);
$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;
$callback = function ($msg){
echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//只有consumer已經處理並確認了上一條message時queue才分派新的message給它
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delay_queue','',false,false,false,false,$callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();