RabbitMQ教程之php-amqplib(六)主題
主題 (topics)
using php-amqplib
在上一節教程中,我們改進了我們的日誌記錄系統。我們使用可以選擇性接收資訊的 direct
型別交換機,而不是使用只能進行虛擬廣播的 fanout
型別交換機。
雖然使用 direct
型別交換改進了我們的系統,但它仍有限制 - 它不能基於對多個標準進行路由選擇。
在我們的日誌系統中,我們可能不僅要根據日誌的嚴重性訂閱日誌,還可以基於發出日誌的源進行訂閱。你也許在 unix syslog 中瞭解了這個概念(不理解也沒關係,接著往下看,自然就明白了)。
這將給我們的系統帶來很大的靈活性 - 我們可能要監聽來自 cron
的嚴重錯誤,也要監聽 kern
為了在我們的系統中實現這樣的功能,我們需要學習一個更復雜的 topic exchange
。
Topic exchange
傳送到 topic exchange 的訊息不能任意命名一個 routing key
- 它必須是由一個.
劃分單詞列表。這些單詞可以是任意的,但它們通常指定與訊息相關聯的一些功能。這裡有幾個有效的 routing key
: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"
, routing key
中可以包含多個單詞,最多可以達到255個位元組。
binding key
也必須是相同的形式。topic exchange
direct exchange
- 使用特定routing key1
傳送的訊息將被傳遞到與binding key
繫結的所有佇列。但是,binding key
有兩個重要的特殊情況:
- *(星號) 可以代替一個單詞
- #(雜湊) 可以匹配0個或多個單詞
在下面的例子中簡單解釋一下(類似於正則):
在本例中,我們將傳送所有描述動物的訊息。訊息將使用由三個單詞(兩個點)組成的routing key
傳送。其中第一個單詞描述 速度,第二個描述顏色,第三個描述種類:”..”。
我們接著建立三個繫結:Q1 binding key “*.orange.*”, Q2 binding key “*.*.rabbit” “lazy.#”。
這三個繫結可以解釋為:
Q1 對所有橙色的動物感興趣。
Q2 想要獲取有關兔子的一切訊息,以及所有惰性動物的一切。
一條 routing key
為 “quick.orange.rabbit” 的訊息將傳遞上面的到兩個對列。routing key
為 “lazy.orange.elephant” 的訊息也將傳遞上面的到兩個對列。另外 “lazy.pink.rabbit” 訊息將只會被傳遞到Q2一次, 即使它匹配了兩個 binding key
。”quick.brown.fox” 不匹配任何 binding key
, 所以它將被丟棄。
如果我們不遵守以上的規則傳送 routing key
為一個或者四個單詞的訊息會發生什麼? 比如,”orange” 或者 “quick.orange.male.rabbit”。那麼我們將丟失這些訊息,因為它們不匹配任何 binding key
。
另一方面,”lazy.orange.male.rabbit” 即使它有四個單詞,但它能匹配最後一個繫結,並且將被傳遞到Q2中。
注意事項
topic exchange 很強大,並且表現得和其他型別交換一樣
當佇列與 “#” 繫結時,它將接收所有訊息而不管routing key
的值,如同fanout exchange
當不使用 “*” 和 “#” 特殊字元時,topic exchange
如同direct exchange
整合程式碼
我們將在日誌記錄系統中使用主題交換。我們將假設 routing key
中有兩個單詞開始工作,比如:”.”。
程式碼幾乎和上一節一樣。
emit_log_topic.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " [x] Sent ",$routing_key,':',$data," \n";
$channel->close();
$connection->close();
?>
receive_logs_topic.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
收聽所有日誌:
php receive_logs_topic.php "#"
收聽所有來自 kern
的日誌:
php receive_logs_topic.php "kern.*"
只收聽 “critical” 型別的日誌:
php receive_logs_topic.php "*.critical"
你也可以建立多個繫結:
php receive_logs_topic.php "kern.*" "*.critical"
發出 routing key
為 “kern.critical”型別的日誌
php emit_log_topic.php "kern.critical" "A critical kernel error"
至此你可以盡情鼓弄這些程式。需要注意的是,這些程式碼沒有對 binding key
和 routing key
賦預設值,除此之外你可能需要路由不止兩個單詞。
接下來,我們需要弄明白怎樣做一個往返資訊的遠端程式:遠端呼叫