PHP與RabbitMQ訊息佇列
RabbitMQ提供跨語言介面,我們可以使用主流程式語言Java,C,C++,Python,PHP等和RabbitMQ做對接。RabbitMQ有訊息確認機制、靈活的路由控制、以及訊息叢集高可用,使得很多大型系統使用RabbitMQ做訊息處理系統。
訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息釋出者只管把訊息釋出到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 中取訊息而不管是誰釋出的。這樣釋出者和使用者都不用知道對方的存在。
簡單的訊息佇列,我們完全可以使用Redis實現,而相對複雜的需求,比如訊息確認、訊息持久化、高可用等需要用RabbitMQ這樣的大器來做比較合適。
本文我們結合例項給大家講解使用PHP處理RabbitMQ訊息佇列的應用。
安裝php-amqplib
php-amqplib是一個純PHP庫,使用它,基於PHP的指令碼客戶端就可以輕鬆的連線和操作RabbitMQ。我們使用composer
來安裝。
composer require php-amqplib/php-amqplib
示例說明
生產者(Producer)和消費者(Consumer)是訊息佇列的基本概念,生產者是指生產訊息的一方,也是訊息傳送方,消費者就是消費訊息的一方,也是訊息接收方,佇列就是儲存訊息的一個快取區。本文例項將由生產者傳送很多訊息給訊息佇列,由多個消費者來消費佇列中的訊息。我們可以想象這樣的場景:皮鞋生產打包打包車間,不斷有成品鞋進入傳送帶(訊息佇列)等待操作工人(消費者)將皮鞋打包。因為等待打包的鞋子特別多,我們需要安排多個打包工人在傳送帶兩邊,及時從傳送帶取出成品鞋,然後裝箱打包。我們要求是要確保工人最後打包好的皮鞋數量一雙不少,不能因為打包工人操作慢或者個人原因暫時離開生產線,導致最終打包數不一致。
訊息傳送
生產者將訊息傳送給佇列,至於誰來消費(處理)這些訊息,生產者不管。
訊息佇列(MQ),用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
訊息到達佇列中後,如果沒有一個消費者來處理訊息的話,我們希望佇列中的訊息不要丟棄,也就是訊息持久化。在生產者和消費者中都要將queue_declare
第3個引數設定為true,表示讓訊息佇列持久化。
$channel->queue_declare($queue, false, true, false, false);
此外,我們可以確保即使RabbitMQ重啟了,訊息佇列不會丟失,在生產者端設定:'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
現在我們建立生產者檔案sender.php,我們假設服務端已經安裝好RabbitMQ,並且開放好對應埠。如何安裝?請看:在CentOS7系統安裝與配置RabbitMQ。
<?php /** * @Author: Helloweba * @sender.php * @訊息生產者-分發任務 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $queue = 'worker'; //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection( '192.168.0.100', 56720, 'helloweba', //user 'helloweba', //password 'test' //vhost ); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); //第3個引數設定為true,表示讓訊息佇列持久化 for ($i = 0; $i < 100; $i++) { $arr = [ 'id' => 'message_' . $i, 'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i, 'content' => 'helloweba-' . time() ]; $data = json_encode($arr); $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////設定rabbitmq重啟後也不會丟失佇列,或者設定為'delivery_mode' => 2 $channel->basic_publish($msg, '', $queue); echo 'Send message: ' . $data . PHP_EOL; } $channel->close(); $connection->close();
上述程式碼中,我們模擬了生產者向佇列中傳送了100條訂單訊息。
訊息接收
消費者是指完成訊息的接收和處理的客戶端程式,消費者就如同生產線上的操作工人,他們按照操作規程從傳送帶上取出產品後有序的完成後續工作任務。
實際專案中,如果消費者處理訊息能力不夠時,就要開啟多個消費者來消費佇列中的訊息。預設情況下,RabbitMQ將會把佇列中的訊息平均分配給每個消費者。如果消費者要對分配到的訊息任務處理時間很長(耗時任務),那麼處理訊息任務的時候就有可能會遇到意外。比如某個消費者斷電了,或者出故障了,那它正在處理的訊息會怎麼辦?這裡就是RabbitMQ的訊息確認機制,為了保證資料不丟失,RabbitMQ會將未處理完的訊息分配給下一個消費者處理。
此外RabbitMQ還可以設定公平分配訊息任務,不會給某個消費者同時分配多個訊息處理任務,因為消費者無法同時處理多個訊息任務。換句話說,RabbitMQ在處理和確認訊息之前,不會向消費者傳送新的訊息,而是將訊息分發給下一個不忙的消費者。
$channel->basic_qos(null, 1, null); //處理和確認完訊息後再消費新的訊息
我們現在建立消費者檔案receiver.php,程式碼如下:
<?php /** * @Author: Helloweba * @receiver.php * @訊息消費者-接收端 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $queue = 'worker'; //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL; $callback = function($msg){ echo " Received message:", $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); //處理和確認完訊息後再消費新的訊息 $channel->basic_consume($queue, '', false, false, false, false, $callback); //第4個引數值為false表示啟用訊息確認 while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
模擬測試
現在我們執行多個消費者終端,可以開啟多個ssh客戶端,client1和client2執行:
php receive.php
然後再開個終端,執行生產者:
php sender.php
由於消費者是阻塞執行的,他們會一直等待佇列中的訊息,當有訊息就會去取出來處理。我們可以模擬將其中某個客戶端中斷,即斷開某個消費者。然後再看訊息是不是被其他消費者接收處理了。同樣我們可以模擬將客戶端全部重啟,看看佇列中的訊息是否沒有丟失。
當client1中斷連線RabbitMQ後,再次執行連線RabbitMQ,在client2中看到的訊息處理情況,注意看圖中的訊息id。
client1:
client2:
接下來我們來了解下RabbitMQ訊息釋出與訂閱相關知識,敬請關注。
sender.php:
<?php /** * @Author: Helloweba * @Date: 2020-01-01 17:40:30 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 16:48:42 * 訊息生產者-分發任務 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $queue = 'worker'; //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection( '192.168.0.100', 56720, 'helloweba', 'helloweba', 'test' ); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); //第3個引數設定為true,表示讓訊息佇列持久化 for ($i = 0; $i < 100; $i++) { $arr = [ 'id' => 'message_' . $i, 'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i, 'content' => 'helloweba-' . time() ]; $data = json_encode($arr); $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////設定rabbitmq重啟後也不會丟失佇列,或者設定為'delivery_mode' => 2 $channel->basic_publish($msg, '', $queue); echo 'Send message: ' . $data . PHP_EOL; } $channel->close(); $connection->close();
receiver.php:
<?php /** * @Author: Helloweba * @Date: 2020-01-01 18:09:23 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 17:29:52 * 訊息消費者-接收端 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $queue = 'worker'; //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection('localhost', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL; $callback = function($msg){ echo " Received message:", $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); //處理和確認完訊息後再消費新的訊息 $channel->basic_consume($queue, '', false, false, false, false, $callback); //第4個引數值為false表示啟用訊息確認 while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();