轉 RabbitMQ 入門教程(PHP版) 使用rabbitmq-delayed-message-exchange外掛實現延遲功能
阿新 • • 發佈:2020-07-18
延遲任務應用場景
場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設定成超時。
場景二:訂單下單之後30分鐘後,如果使用者沒有付錢,則系統自動取消訂單。
場景三:過1分鐘給新註冊會員的使用者,傳送註冊郵件等。
php 使用rabbitmq-delayed-message-exchange外掛實現延遲功能
1.安裝
下載後解壓,並將其拷貝至(使用Linux Debian/RPM部署)rabbitmq伺服器目錄:/usr/local/rabbitmq/plugins
中( windows安裝目錄\rabbitmq_server-version\plugins
2.啟用外掛
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang
啟用外掛
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
輸出如下:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
通過rabbitmq-plugins list
檢視已安裝列表,如下:
... [ ] rabbitmq_delayed_message_exchange 20171215-3.6.x ...
3.機制解釋
安裝外掛後會生成新的Exchange型別x-delayed-message
,該型別訊息支援延遲投遞機制,接收到訊息後並未立即將訊息投遞至目標佇列中,而是儲存在mnesia
(一個分散式資料系統)表中,檢測訊息延遲時間,如達到可投遞時間時並將其通過x-delayed-type
型別標記的交換機型別投遞至目標佇列。
4.php實現過程
消費者 delay_consumer2.php:
<?php //header('Content-Type:text/html;charset=utf8;'); $params = array( 'exchangeName' => 'delayed_exchange_test', 'queueName' => 'delayed_queue_test', 'routeKey' => 'delayed_route_test', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日誌 echo 'rabbit-mq 連線錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日誌 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); //$exchange->setFlags(AMQP_DURABLE);//宣告一個已存在的交換器的,如果不存在將丟擲異常,這個一般用在consume端 $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message型別 /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。 fanout:把所有傳送到該Exchange的訊息投遞到所有與它繫結的佇列中。 direct:把訊息投遞到那些binding key與routing key完全匹配的佇列中。 topic:將訊息路由到binding key與routing key模式匹配的佇列中。*/ $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //繫結 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { echo $e->getMessage(); exit(); } function callback(AMQPEnvelope $message) { global $queue; if ($message) { $body = $message->getBody(); echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接收內容:'.$body . PHP_EOL; //為了防止接收端在處理訊息時down掉,只有在訊息處理完成後才傳送ack訊息 $queue->ack($message->getDeliveryTag()); } else { echo 'no message' . PHP_EOL; } } //$queue->consume('callback'); 第一種消費方式,但是會阻塞,程式一直會卡在此處 //第二種消費方式,非阻塞 /*$start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo $message->getBody(); $queue->ack($message->getDeliveryTag()); //應答,代表該訊息已經消費 $end = time(); echo '<br>' . ($end - $start); exit(); } else { //echo 'message not found' . PHP_EOL; } }*/ //注意:這裡需要注意的是這個方法:$queue->consume,queue物件有兩個方法可用於取訊息:consume和get。前者是阻塞的,無訊息時會被掛起,適合迴圈中使用;後者則是非阻塞的,取訊息時有則取,無則返回false。 //就是說用了consume之後,會同步阻塞,該程式常駐記憶體,不能用nginx,apache呼叫。 $action = '2'; if($action == '1'){ $queue->consume('callback'); //第一種消費方式,但是會阻塞,程式一直會卡在此處 }else{ //第二種消費方式,非阻塞 $start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接收內容:'.$message->getBody().PHP_EOL; $queue->ack($message->getDeliveryTag()); //應答,代表該訊息已經消費 $end = time(); echo '執行時間:'.($end - $start).'秒'.PHP_EOL; //exit(); } else { //echo 'message not found' . PHP_EOL; } } }
生產者delay_publisher2.php:
<?php //header('Content-Type:text/html;charset=utf-8;'); $params = array( 'exchangeName' => 'delayed_exchange_test', 'queueName' => 'delayed_queue_test', 'routeKey' => 'delayed_route_test', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); 判斷是否載入amqp擴充套件 //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日誌 echo 'rabbit-mq 連線錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日誌 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message型別 /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。 fanout:把所有傳送到該Exchange的訊息投遞到所有與它繫結的佇列中。 direct:把訊息投遞到那些binding key與routing key完全匹配的佇列中。 topic:將訊息路由到binding key與routing key模式匹配的佇列中。*/ $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange(); //$channel->startTransaction(); //RabbitMQ不容許宣告2個相同名稱、配置不同的Queue,否則報錯 $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //繫結佇列和交換機 $queue->bind($params['exchangeName'], $params['routeKey']); //$channel->commitTransaction(); } catch(Exception $e) { } for($i=5;$i>0;$i--){ //生成訊息 echo '傳送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL; echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL; $message = json_encode(['order_id'=>time(),'i'=>$i]); $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]); sleep(2); } $conn->disconnect();
對於程式碼來講,首先對於消費者核心程式碼
$exchange->setType('x-delayed-message'); //x-delayed-message型別 $exchange->setArgument('x-delayed-type','direct');
生產者核心程式碼
$exchange = new AMQPExchange($channel); $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message型別 $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange();
使用方法:先執行delay_consumer1.php,再執行delay_publisher1.php
執行效果:
原文https://www.cnblogs.com/-mrl/p/11114116.html