PHP+RabbitMQ訊息釋出與訂閱
我們有一個小說系統,每天會有很多作者釋出新的小說內容,而讀者因為個人愛好可能只訂閱他喜歡的型別的小說,比如歷史類、玄幻類小說。小說系統每天會根據使用者的口味推送相關的小說更新訊息,這就用到了訊息釋出和訂閱系統。
本文將結合例項介紹PHP操作RabbitMQ實現訊息釋出和訂閱功能,本文假設您已經安裝好RabbitMQ,開放了對應的埠,且安裝了php-amqplib。前置文章閱讀:《在CentOS7系統安裝與配置RabbitMQ》、《PHP與RabbitMQ訊息佇列》。
概念
我們先來了解幾個概念:
交換器(Exchanges)
RabbitMQ訊息傳遞模型的核心思想是,生產者不傳送任何資訊直接到佇列。事實上,生產者甚至不知道訊息是否會發送到任何佇列。生產者只能向交換器傳送訊息(也叫交換機,預設交換器使用""空字元標記)。交換器需要知道如何處理接收的訊息,將訊息推入到指定的佇列中,決定訊息是否入列和拋棄。如下圖,P表示訊息釋出者,X表示交換機,Q1和Q2表示不同的佇列。
交換型別
交換機有幾種型別:direct, topic, headers 和 fanout。
fanout:廣播訂閱,向所有的消費者釋出訊息。每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。
direct:訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果一個佇列繫結到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的訊息,不會轉發“dogA”,也不會轉發“dogB”等等。它是完全匹配、單播的模式。
topic:topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配不多不少一個單詞。
headers型別的交換器基本不用,本文忽略。
舉例:以下程式碼,釋出者向名叫msg的交換器釋出廣播訊息,全體消費者都能收到相同的訊息。
$channel->exchange_declare('msg', 'fanout', false, false, false);
繫結(Bindings)
交換器和佇列之間的對應關係稱為繫結,可以理解為,佇列對來自此交換器的訊息感興趣。
以下程式碼表示將佇列繫結到名叫article的交換器上。
$channel->queue_bind($queue_name, 'article');
路由鍵
繫結可以採取額外的routing_key引數。避免混淆和$channel::basic_publish引數我們要叫它繫結key。這就是我們如何用鍵建立繫結的原因:
$routerKey = 'abc'; $channel->queue_bind($queueName, $exchange, $routerKey);
訊息釋出
我們建立釋出者檔案publish_direct.php,指定交換機為article,型別為direct,我們只允許訂閱了對應型別小說文章的消費者才可以消費對應的小說文章訊息。我們將向消費者釋出四個型別的小說文章訊息:fantasy(玄幻),military(軍事),history(歷史),romance(言情)。
以下程式碼模擬了釋出者釋出100條隨機訊息:
<?php /** * @釋出訊息 * @Author: Helloweba * @publish_direct.php */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $exchange = 'article'; $connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); for ($i = 0; $i < 100; $i++) { $cate = ['fantasy', 'military', 'history', 'romance']; $key = array_rand($cate); $arr = [ 'id' => 'message_' . $i, 'content' => 'helloweba '. $cate[$key] ]; $data = json_encode($arr); $msg = new AMQPMessage($data); $channel->basic_publish($msg, $exchange, $cate[$key]); echo 'Send '.$cate[$key].' message: ' . $data . PHP_EOL; } $channel->close(); $connection->close();
訊息訂閱
現在我們建立訂閱者檔案subscribe_direct.php,指定交換機為article,路由鍵為fantasy,意為只訂閱玄幻小說類訊息,程式碼如下:
channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { //echo " Received message:", $msg->body, PHP_EOL; echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
接著再新建訂閱者檔案subscribe_direct_2.php,複製貼上subscribe_direct.php檔案的程式碼,並將路由鍵改為history,意為只訂閱歷史小說類訊息
模擬測試
好了,現在我們開啟兩個終端,分別執行兩個訂閱者程式:
php subscribe_direct.php //client1訂閱玄幻小說類訊息 php subscribe_direct_2.php //client2訂閱歷史小說類訊息
再另開啟一個終端,執行釋出者程式:
php publish_direct.php
現在你應該可以看到如圖效果:
client1,只訂閱玄幻類(fantasy)訊息:
client2,只訂閱歷史類(history)訊息:
publish_direct.php:
<?php /** * 釋出訊息 * @Author: Helloweba * @Date: 2020-01-01 20:24:22 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 20:29:46 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $exchange = 'article'; $connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); //$channel->exchange_declare($exchange, 'fanout', false, false, false); $channel->exchange_declare($exchange, 'direct', false, false, false); for ($i = 0; $i < 100; $i++) { $cate = ['fantasy', 'military', 'history', 'romance']; $key = array_rand($cate); $arr = [ 'id' => 'message_' . $i, 'content' => 'helloweba '. $cate[$key] ]; $data = json_encode($arr); $msg = new AMQPMessage($data); $channel->basic_publish($msg, $exchange, $cate[$key]); echo 'Send '.$cate[$key].' message: ' . $data . PHP_EOL; } $channel->close(); $connection->close();
subscribe_direct.php:
<?php /** * 訂閱訊息 * @Author: Helloweba * @Date: 2020-01-01 20:24:57 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 20:17:16 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $exchange = 'article'; $routerKey = 'fantasy'; //只消費玄幻類訊息 //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection('localhost', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { //echo " Received message:", $msg->body, PHP_EOL; echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
subscribe_direct_2.php:
<?php /** * 訂閱訊息 * @Author: Helloweba * @Date: 2020-01-01 20:24:57 * @Last Modified by: Helloweba * @Last Modified time: 2020-01-05 20:18:25 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $exchange = 'article'; $routerKey = 'history'; //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $connection = new AMQPStreamConnection('localhost', 56720, 'helloweba', 'helloweba', 'test'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); //$channel->queue_bind($queue_name, $exchange); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模擬耗時執行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();