RabbitMQ 在 PHP 下的簡單使用 (二) -- Topic Exchange 和 Fanout Exchange
阿新 • • 發佈:2019-03-16
http 需要 code 發送 tty hang amqp root 根據
Topic Exchange
此模式下交換機,在推送消息時, 會根據消息的主題詞和隊列的主題詞決定將消息推送到哪個隊列. 交換機只會為 Queue 分發符合其指定的主題的消息。
- 向交換機發送消息時,消息的 routing key 就是主題關鍵詞,主題詞不能隨意設置,必須由 "." 聯結多個主題詞 (如:log.error、log.warn) .
- 必須將隊列顯示的綁定到指定的交換機上.
- 為隊列指定隊列主題詞時,可以使用通配符: "#": 表示 0 或多個主題詞; "*": 表示 1 個主題詞.
示例
producer.php
header(‘Content-Type: text/html; charset=utf-8‘);// 連接設置 $conConfig = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘root‘, ‘password‘ => ‘root‘, ‘vhost‘ => ‘/‘ ]; try { // RabbitMQ 連接實例 $con = new AMQPConnection($conConfig); // 發起連接 $con->connect(); // 判斷連接結果,true成功,false失敗 if(!$con->isConnected()) { echo ‘連接失敗‘;die; } // 新建通道 $channel = new AMQPChannel($con); // 在指定通道上新建交換機,如果新建的交換機與已存在交換機重名但屬性不同,會產生406錯誤。如果新建交換機與已存在交換機完全相同,不會有任何影響 $exchange = new AMQPExchange($channel); // 交換機名稱 $exchange->setName(‘test.topic‘); // 交換機類型:按主題分發消息$exchange->setType(‘topic‘); // 聲明交換機 $exchange->declareExchange(); // 發送消息到交換機,通過routing key指定消息的主題詞 $state = $exchange->publish(‘apple‘, ‘fruit.red‘); if($state) echo ‘Success‘ . PHP_EOL; else echo ‘Fail‘ . PHP_EOL; $state = $exchange->publish(‘cat‘, ‘animal.red‘); if($state) echo ‘Success‘ . PHP_EOL; else echo ‘Fail‘ . PHP_EOL; $con->disconnect(); } catch(Exception $e) { echo $e->getMessage(); }
consumer.php
header(‘Content-Type: text/html; charset=utf-8‘); // 連接設置 $conConfig = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘root‘, ‘password‘ => ‘root‘, ‘vhost‘ => ‘/‘ ]; try { $connect = new AMQPConnection($conConfig); $connect->connect(); if(!$connect->isConnected()) { echo ‘連接失敗‘;die; } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName(‘test.topic‘); $exchange->setType(‘topic‘); $exchange->declareExchange(); // 創建兩個隊列, 一個關聯red主題詞, 一個關聯animal主題詞 $queueRed = new AMQPQueue($channel); $queueAnimal = new AMQPQueue($channel); // 隊列名稱 $queueRed->setName(‘red.queue‘); $queueAnimal->setName(‘animal.queue‘); // 聲明隊列 $queueRed->declareQueue(); $queueAnimal->declareQueue(); // 為隊列綁定主題詞, *: 匹配0或多個任意主題詞, #: 1個任意主題詞. 綁定主題詞時, 主題詞出現的位置也要匹配 $queueRed->bind(‘test.topic‘, "*.red"); $queueAnimal->bind(‘test.topic‘, ‘animal.#‘); echo ‘red queue: ‘ . PHP_EOL; while($msgEnvelope = $queueRed->get(AMQP_AUTOACK)) { $msg = $msgEnvelope->getBody(); echo $msg . PHP_EOL; } echo ‘animal queue: ‘ . PHP_EOL; while($msgEnvelope = $queueAnimal->get(AMQP_AUTOACK)) { $msg = $msgEnvelope->getBody(); echo $msg . PHP_EOL; } $connect->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }
運行結果:
red queue 獲得了所有包含 red 主題詞的消息, animal queue 僅獲得包含 animal 主題的消息.
Fanout Exchange
該模式下的交換機是廣播模式, 交換機會向所有綁定的隊列分發消息, 不需要設置交換機和隊列的 routing key. 即使設置了, 也會被忽略.
示例
producer.php
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 // 連接設置 3 $conConfig = [ 4 ‘host‘ => ‘127.0.0.1‘, 5 ‘port‘ => 5672, 6 ‘login‘ => ‘root‘, 7 ‘password‘ => ‘root‘, 8 ‘vhost‘ => ‘/‘ 9 ]; 10 11 try 12 { 13 // RabbitMQ 連接實例 14 $con = new AMQPConnection($conConfig); 15 // 發起連接 16 $con->connect(); 17 // 判斷連接結果,true成功,false失敗 18 if(!$con->isConnected()) 19 { 20 echo ‘連接失敗‘;die; 21 } 22 // 新建通道 23 $channel = new AMQPChannel($con); 24 // 在指定通道上新建交換機,如果新建的交換機與已存在交換機重名但屬性不同,會產生406錯誤。如果新建交換機與已存在交換機完全相同,不會有任何影響 25 $exchange = new AMQPExchange($channel); 26 // 交換機名稱 27 $exchange->setName(‘test.fanout‘); 28 // 交換機類型:廣播模式 29 $exchange->setType(‘fanout‘); 30 // 聲明交換機 31 $exchange->declareExchange(); 32 33 for($i = 0; $i < 3; $i++) 34 { 35 $msg = ‘消息‘; 36 // 發送消息到交換機,Fanout模式下不需要指定routing key。即使指定也會被忽略 37 $state = $exchange->publish($msg . $i); 38 if($state) 39 { 40 echo ‘Success‘ . PHP_EOL; 41 }else 42 { 43 echo ‘Fail‘ . PHP_EOL; 44 } 45 } 46 47 $con->disconnect(); 48 } 49 catch(Exception $e) 50 { 51 echo $e->getMessage(); 52 }
consumer.php
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 // 連接設置 3 $conConfig = [ 4 ‘host‘ => ‘127.0.0.1‘, 5 ‘port‘ => 5672, 6 ‘login‘ => ‘root‘, 7 ‘password‘ => ‘root‘, 8 ‘vhost‘ => ‘/‘ 9 ]; 10 11 try 12 { 13 $connect = new AMQPConnection($conConfig); 14 $connect->connect(); 15 if(!$connect->isConnected()) 16 { 17 echo ‘連接失敗‘;die; 18 } 19 20 $channel = new AMQPChannel($connect); 21 22 $exchange = new AMQPExchange($channel); 23 $exchange->setName(‘test.fanout‘); 24 $exchange->setType(‘fanout‘); 25 $exchange->declareExchange(); 26 27 // 創建隊列 28 $queue1 = new AMQPQueue($channel); 29 $queue2 = new AMQPQueue($channel); 30 31 // 隊列名稱 32 $queue1->setName(‘queue1‘); 33 $queue2->setName(‘queue2‘); 34 // 聲明隊列 35 $queue1->declareQueue(); 36 $queue2->declareQueue(); 37 // 綁定隊列到交換機。Fanout模式下不需要指定routing key,即使指定也會被忽略 38 $queue1->bind(‘test.fanout‘); 39 echo ‘隊列1: ‘ . PHP_EOL; 40 while($msgEnvelope = $queue1->get(AMQP_AUTOACK)) 41 { 42 $msg = $msgEnvelope->getBody(); 43 echo $msg . PHP_EOL; 44 } 45 46 47 echo PHP_EOL . ‘隊列2: ‘ . PHP_EOL; 48 $queue2->bind(‘test.fanout‘); 49 while($msgEnvelope = $queue2->get(AMQP_AUTOACK)) 50 { 51 $msg = $msgEnvelope->getBody(); 52 echo $msg . PHP_EOL; 53 } 54 $connect->disconnect(); 55 }catch(Exception $e) 56 { 57 echo $e->getMessage(); 58 }
運行結果:
兩個隊列獲得相同的消息.
RabbitMQ 在 PHP 下的簡單使用 (二) -- Topic Exchange 和 Fanout Exchange