(八)RabbitMQ訊息佇列-通過Topic主題模式分發訊息
前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配訊息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精準匹配的,而topic是通過routingkey來模糊匹配。
在topic模式下支援兩個特殊字元的匹配。
* (星號) 代表任意 一個單詞 # (井號) 0個或者多個單詞
注意:上面說的是單詞不是字元。
如下圖所示,RabbitMQ direct模式通過RoutingKey來精準匹配,RoutingKey為red的投遞到Queue1,RoutingKey為black和white的投遞到Queue2。
我們可以假設一個場景,我們要做一個日誌模組來收集處理不同的日誌,日誌區分包含三個維度的標準:模組、日誌緊急程度、日誌重要程度。模組分為:red、black、white;緊急程度分為:critical、normal;把重要程度分為:medium、low、high在RoutingKey欄位中我們把這三個維度通過兩個“.“連線起來。
現在我們需要對black模組,緊急程度為critical,重要程度為high的日誌分配到佇列1列印到螢幕;對所以模組重要程度為high的日誌和white緊急程度為critical的日誌傳送到佇列2持久化到硬碟。如下示例:
RoutingKey為“black.critical.high”的日誌會投遞到queue1和queue2,。
RoutingKey為“red.critical.high”的日誌會只投遞到queue2。
RoutingKey為“white.critical.high”的日誌會投遞到queue2,並且雖然queue2的兩個匹配規則都符合但只會向queue2投遞一份。
新建topic.php用來發布三種routingkey的訊息。
<?php
/*
* topic 模式
* create by superrd
*/
$exchangeName = 'extopic';
$routeKey1 = "black.critical.high";
$routeKey2 = "red.critical.high" ;
$routeKey3 = "white.critical.high";
$message1 = 'black-critical-high!';
$message2 = 'red-critical-high!';
$message3 = 'white-critical-high!';
$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
try {
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$exchange->publish($message1,$routeKey1);
var_dump("[x] Sent ".$message1);
$exchange->publish($message2,$routeKey2);
var_dump("[x] Sent ".$message2);
$exchange->publish($message3,$routeKey3);
var_dump("[x] Sent ".$message3);
} catch (AMQPConnectionException $e) {
var_dump($e);
exit();
}
$connection->disconnect();
q1.php用來監聽queue1佇列:
<?php
/*
* topic 模式
* create by superrd
*/
$queueName = 'queue1';
$exchangeName = 'extopic';
$routeKey = "black.critical.high";
$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
//阻塞模式接收訊息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自動ACK應答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
$conn->disconnect();
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
$msg = $envelope->getBody();
echo $msg."\n"; //處理訊息
$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}
q2.php用來監聽queue2佇列:
<?php
/*
* topic 模式
* create by superrd
*/
$queueName = 'queue2';
$exchangeName = 'extopic';
$routeKey1 = "#.high";
$routeKey2 = "white.critical.*";
$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey1);
$queue->bind($exchangeName, $routeKey2);
//阻塞模式接收訊息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自動ACK應答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
$conn->disconnect();
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
$msg = $envelope->getBody();
echo $msg."\n"; //處理訊息
$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}
先執行q1.php和q2.php指令碼保持訂閱狀態。然後執行topic.php指令碼釋出訊息。q1和q2收到的訊息如下:
如上截圖,驗證了我們之前的結論。
另外還有一些特殊情況例如:
- 如果binding_key 是 “#” - 它會接收所有的Message,不管routing_key是什麼,就像是fanout
exchange。 - 如果 “*” and “#” 沒有被使用,那麼topic exchange就變成了direct exchange。
RabbitMQ技術交流QQ群:327034977(新增時請備註RabbitMQ)