1. 程式人生 > 實用技巧 >轉 RabbitMQ 入門教程(PHP版) 使用rabbitmq-delayed-message-exchange外掛實現延遲功能

轉 RabbitMQ 入門教程(PHP版) 使用rabbitmq-delayed-message-exchange外掛實現延遲功能

延遲任務應用場景

場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設定成超時。

場景二:訂單下單之後30分鐘後,如果使用者沒有付錢,則系統自動取消訂單。

場景三:過1分鐘給新註冊會員的使用者,傳送註冊郵件等。

php 使用rabbitmq-delayed-message-exchange外掛實現延遲功能

1.安裝

下載後解壓,並將其拷貝至(使用Linux Debian/RPM部署)rabbitmq伺服器目錄:/usr/local/rabbitmq/plugins中( windows安裝目錄\rabbitmq_server-version\plugins

).

2.啟用外掛

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang啟用外掛

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

輸出如下:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

通過rabbitmq-plugins list檢視已安裝列表,如下:

...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

3.機制解釋

安裝外掛後會生成新的Exchange型別x-delayed-message,該型別訊息支援延遲投遞機制,接收到訊息後並未立即將訊息投遞至目標佇列中,而是儲存在mnesia(一個分散式資料系統)表中,檢測訊息延遲時間,如達到可投遞時間時並將其通過x-delayed-type型別標記的交換機型別投遞至目標佇列。

4.php實現過程

消費者 delay_consumer2.php:

<?php

//header('Content-Type:text/html;charset=utf8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 記錄日誌
        echo 'rabbit-mq 連線錯誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 記錄日誌
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    //$exchange->setFlags(AMQP_DURABLE);//宣告一個已存在的交換器的,如果不存在將丟擲異常,這個一般用在consume端
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message型別
    /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

 fanout:把所有傳送到該Exchange的訊息投遞到所有與它繫結的佇列中。

direct:把訊息投遞到那些binding key與routing key完全匹配的佇列中。

 topic:將訊息路由到binding key與routing key模式匹配的佇列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //繫結
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
    echo $e->getMessage();
    exit();
}

function callback(AMQPEnvelope $message) {
    global $queue;
    if ($message) {
        $body = $message->getBody();
        echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
        echo '接收內容:'.$body . PHP_EOL;
        //為了防止接收端在處理訊息時down掉,只有在訊息處理完成後才傳送ack訊息
        $queue->ack($message->getDeliveryTag());
    } else {
        echo 'no message' . PHP_EOL;
    }
}

//$queue->consume('callback');  第一種消費方式,但是會阻塞,程式一直會卡在此處

//第二種消費方式,非阻塞
/*$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //應答,代表該訊息已經消費
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }
    else
    {
        //echo 'message not found' . PHP_EOL;
    }
}*/

//注意:這裡需要注意的是這個方法:$queue->consume,queue物件有兩個方法可用於取訊息:consume和get。前者是阻塞的,無訊息時會被掛起,適合迴圈中使用;後者則是非阻塞的,取訊息時有則取,無則返回false。
//就是說用了consume之後,會同步阻塞,該程式常駐記憶體,不能用nginx,apache呼叫。 
$action = '2';

if($action == '1'){
    $queue->consume('callback');  //第一種消費方式,但是會阻塞,程式一直會卡在此處
}else{
    //第二種消費方式,非阻塞
    $start = time();
    while(true)
    {
        $message = $queue->get();
        if(!empty($message))
        {
            echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
            echo '接收內容:'.$message->getBody().PHP_EOL;
            $queue->ack($message->getDeliveryTag());    //應答,代表該訊息已經消費
            $end = time();
            echo '執行時間:'.($end - $start).'秒'.PHP_EOL;
            //exit();
        }
        else
        {
            //echo 'message not found' . PHP_EOL;
        }
    }
}

生產者delay_publisher2.php:

<?php

//header('Content-Type:text/html;charset=utf-8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);

$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp')); 判斷是否載入amqp擴充套件
//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 記錄日誌
        echo 'rabbit-mq 連線錯誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 記錄日誌
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message型別
    /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

 fanout:把所有傳送到該Exchange的訊息投遞到所有與它繫結的佇列中。

direct:把訊息投遞到那些binding key與routing key完全匹配的佇列中。

 topic:將訊息路由到binding key與routing key模式匹配的佇列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();
    //RabbitMQ不容許宣告2個相同名稱、配置不同的Queue,否則報錯
    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //繫結佇列和交換機
    $queue->bind($params['exchangeName'], $params['routeKey']);
    //$channel->commitTransaction();
} catch(Exception $e) {

}

for($i=5;$i>0;$i--){
    //生成訊息
    echo '傳送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL;
    echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL;
    $message = json_encode(['order_id'=>time(),'i'=>$i]);
    $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
    sleep(2);
}
$conn->disconnect();

對於程式碼來講,首先對於消費者核心程式碼

$exchange->setType('x-delayed-message'); //x-delayed-message型別
$exchange->setArgument('x-delayed-type','direct');

生產者核心程式碼

$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message型別
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();

使用方法:先執行delay_consumer1.php,再執行delay_publisher1.php

執行效果:

原文https://www.cnblogs.com/-mrl/p/11114116.html