1. 程式人生 > 實用技巧 >PHP與RabbitMQ訊息佇列

PHP與RabbitMQ訊息佇列

RabbitMQ提供跨語言介面,我們可以使用主流程式語言Java,C,C++,Python,PHP等和RabbitMQ做對接。RabbitMQ有訊息確認機制、靈活的路由控制、以及訊息叢集高可用,使得很多大型系統使用RabbitMQ做訊息處理系統。

訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息釋出者只管把訊息釋出到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 中取訊息而不管是誰釋出的。這樣釋出者和使用者都不用知道對方的存在。

簡單的訊息佇列,我們完全可以使用Redis實現,而相對複雜的需求,比如訊息確認、訊息持久化、高可用等需要用RabbitMQ這樣的大器來做比較合適。

本文我們結合例項給大家講解使用PHP處理RabbitMQ訊息佇列的應用。

安裝php-amqplib

php-amqplib是一個純PHP庫,使用它,基於PHP的指令碼客戶端就可以輕鬆的連線和操作RabbitMQ。我們使用composer來安裝。

composer require php-amqplib/php-amqplib

示例說明

生產者(Producer)和消費者(Consumer)是訊息佇列的基本概念,生產者是指生產訊息的一方,也是訊息傳送方,消費者就是消費訊息的一方,也是訊息接收方,佇列就是儲存訊息的一個快取區。本文例項將由生產者傳送很多訊息給訊息佇列,由多個消費者來消費佇列中的訊息。我們可以想象這樣的場景:皮鞋生產打包打包車間,不斷有成品鞋進入傳送帶(訊息佇列)等待操作工人(消費者)將皮鞋打包。因為等待打包的鞋子特別多,我們需要安排多個打包工人在傳送帶兩邊,及時從傳送帶取出成品鞋,然後裝箱打包。我們要求是要確保工人最後打包好的皮鞋數量一雙不少,不能因為打包工人操作慢或者個人原因暫時離開生產線,導致最終打包數不一致。

訊息傳送

生產者將訊息傳送給佇列,至於誰來消費(處理)這些訊息,生產者不管。

訊息佇列(MQ),用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。

訊息到達佇列中後,如果沒有一個消費者來處理訊息的話,我們希望佇列中的訊息不要丟棄,也就是訊息持久化。在生產者和消費者中都要將queue_declare第3個引數設定為true,表示讓訊息佇列持久化。

$channel->queue_declare($queue, false, true, false, false); 

此外,我們可以確保即使RabbitMQ重啟了,訊息佇列不會丟失,在生產者端設定:'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

現在我們建立生產者檔案sender.php,我們假設服務端已經安裝好RabbitMQ,並且開放好對應埠。如何安裝?請看:在CentOS7系統安裝與配置RabbitMQ

<?php
/**
 * @Author: Helloweba
 * @sender.php
 * @訊息生產者-分發任務
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
    '192.168.0.100', 
    56720, 
    'helloweba',  //user
    'helloweba',  //password
    'test'  //vhost
);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false); //第3個引數設定為true,表示讓訊息佇列持久化

for ($i = 0; $i < 100; $i++) { 
    $arr = [
        'id' => 'message_' . $i,
        'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
        'content' => 'helloweba-' . time()
    ];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////設定rabbitmq重啟後也不會丟失佇列,或者設定為'delivery_mode' => 2
    $channel->basic_publish($msg, '', $queue);

    echo 'Send message: ' . $data . PHP_EOL;
}

$channel->close();
$connection->close();

上述程式碼中,我們模擬了生產者向佇列中傳送了100條訂單訊息。

訊息接收

消費者是指完成訊息的接收和處理的客戶端程式,消費者就如同生產線上的操作工人,他們按照操作規程從傳送帶上取出產品後有序的完成後續工作任務。

實際專案中,如果消費者處理訊息能力不夠時,就要開啟多個消費者來消費佇列中的訊息。預設情況下,RabbitMQ將會把佇列中的訊息平均分配給每個消費者。如果消費者要對分配到的訊息任務處理時間很長(耗時任務),那麼處理訊息任務的時候就有可能會遇到意外。比如某個消費者斷電了,或者出故障了,那它正在處理的訊息會怎麼辦?這裡就是RabbitMQ的訊息確認機制,為了保證資料不丟失,RabbitMQ會將未處理完的訊息分配給下一個消費者處理。

此外RabbitMQ還可以設定公平分配訊息任務,不會給某個消費者同時分配多個訊息處理任務,因為消費者無法同時處理多個訊息任務。換句話說,RabbitMQ在處理和確認訊息之前,不會向消費者傳送新的訊息,而是將訊息分發給下一個不忙的消費者。

$channel->basic_qos(null, 1, null); //處理和確認完訊息後再消費新的訊息

我們現在建立消費者檔案receiver.php,程式碼如下:

<?php
/**
 * @Author: Helloweba
 * @receiver.php
 * @訊息消費者-接收端
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;

$callback = function($msg){
    echo " Received message:", $msg->body, PHP_EOL;
    sleep(1);  //模擬耗時執行
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null); //處理和確認完訊息後再消費新的訊息
$channel->basic_consume($queue, '', false, false, false, false, $callback); //第4個引數值為false表示啟用訊息確認

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

模擬測試

現在我們執行多個消費者終端,可以開啟多個ssh客戶端,client1和client2執行:

php receive.php

然後再開個終端,執行生產者:

php sender.php

由於消費者是阻塞執行的,他們會一直等待佇列中的訊息,當有訊息就會去取出來處理。我們可以模擬將其中某個客戶端中斷,即斷開某個消費者。然後再看訊息是不是被其他消費者接收處理了。同樣我們可以模擬將客戶端全部重啟,看看佇列中的訊息是否沒有丟失。

當client1中斷連線RabbitMQ後,再次執行連線RabbitMQ,在client2中看到的訊息處理情況,注意看圖中的訊息id。

client1:

client2:

接下來我們來了解下RabbitMQ訊息釋出與訂閱相關知識,敬請關注。

sender.php:

<?php

/**
 * @Author: Helloweba
 * @Date:   2020-01-01 17:40:30
 * @Last Modified by:   Helloweba
 * @Last Modified time: 2020-01-05 16:48:42
 * 訊息生產者-分發任務
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection(
    '192.168.0.100', 
    56720, 
    'helloweba', 
    'helloweba', 
    'test'
);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false); //第3個引數設定為true,表示讓訊息佇列持久化

for ($i = 0; $i < 100; $i++) { 
    $arr = [
        'id' => 'message_' . $i,
        'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
        'content' => 'helloweba-' . time()
    ];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////設定rabbitmq重啟後也不會丟失佇列,或者設定為'delivery_mode' => 2
    $channel->basic_publish($msg, '', $queue);

    echo 'Send message: ' . $data . PHP_EOL;
}

$channel->close();
$connection->close();

receiver.php:

<?php

/**
 * @Author: Helloweba
 * @Date:   2020-01-01 18:09:23
 * @Last Modified by:   Helloweba
 * @Last Modified time: 2020-01-05 17:29:52
 * 訊息消費者-接收端
 */

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'worker';

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$connection = new AMQPStreamConnection('localhost', 56720, 'helloweba', 'helloweba', 'test');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;

$callback = function($msg){
    echo " Received message:", $msg->body, PHP_EOL;
    sleep(1);  //模擬耗時執行
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null); //處理和確認完訊息後再消費新的訊息
$channel->basic_consume($queue, '', false, false, false, false, $callback); //第4個引數值為false表示啟用訊息確認

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

轉載 :https://www.helloweba.net/php/625.html