1. 程式人生 > >RabbitMQ 在 PHP 下的簡單使用 (二) -- Topic Exchange 和 Fanout Exchange

RabbitMQ 在 PHP 下的簡單使用 (二) -- Topic Exchange 和 Fanout Exchange

http 需要 code 發送 tty hang amqp root 根據

Topic Exchange

此模式下交換機,在推送消息時, 會根據消息的主題詞和隊列的主題詞決定將消息推送到哪個隊列. 交換機只會為 Queue 分發符合其指定的主題的消息。

  1. 向交換機發送消息時,消息的 routing key 就是主題關鍵詞,主題詞不能隨意設置,必須由 "." 聯結多個主題詞 (如:log.error、log.warn) .
  2. 必須將隊列顯示的綁定到指定的交換機上.
  3. 為隊列指定隊列主題詞時,可以使用通配符: "#": 表示 0 或多個主題詞; "*": 表示 1 個主題詞.

示例

producer.php

header(‘Content-Type: text/html; charset=utf-8‘);
// 連接設置 $conConfig = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘root‘, ‘password‘ => ‘root‘, ‘vhost‘ => ‘/‘ ]; try { // RabbitMQ 連接實例 $con = new AMQPConnection($conConfig); // 發起連接 $con->connect(); // 判斷連接結果,true成功,false失敗 if
(!$con->isConnected()) { echo ‘連接失敗‘;die; } // 新建通道 $channel = new AMQPChannel($con); // 在指定通道上新建交換機,如果新建的交換機與已存在交換機重名但屬性不同,會產生406錯誤。如果新建交換機與已存在交換機完全相同,不會有任何影響 $exchange = new AMQPExchange($channel); // 交換機名稱 $exchange->setName(‘test.topic‘); // 交換機類型:按主題分發消息
$exchange->setType(‘topic‘); // 聲明交換機 $exchange->declareExchange(); // 發送消息到交換機,通過routing key指定消息的主題詞 $state = $exchange->publish(‘apple‘, ‘fruit.red‘); if($state) echo ‘Success‘ . PHP_EOL; else echo ‘Fail‘ . PHP_EOL; $state = $exchange->publish(‘cat‘, ‘animal.red‘); if($state) echo ‘Success‘ . PHP_EOL; else echo ‘Fail‘ . PHP_EOL; $con->disconnect(); } catch(Exception $e) { echo $e->getMessage(); }

consumer.php

header(‘Content-Type: text/html; charset=utf-8‘);
// 連接設置
$conConfig = [
    ‘host‘ => ‘127.0.0.1‘,
    ‘port‘ => 5672,
    ‘login‘ => ‘root‘,
    ‘password‘ => ‘root‘,
    ‘vhost‘ => ‘/‘
];

try
{
    $connect = new AMQPConnection($conConfig);
    $connect->connect();
    if(!$connect->isConnected())
    {
        echo ‘連接失敗‘;die;
    }
    
    $channel = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    $exchange->setName(‘test.topic‘);
    $exchange->setType(‘topic‘);
    $exchange->declareExchange();
    
    // 創建兩個隊列, 一個關聯red主題詞, 一個關聯animal主題詞
    $queueRed = new AMQPQueue($channel);
    $queueAnimal = new AMQPQueue($channel);
    // 隊列名稱
    $queueRed->setName(‘red.queue‘);
    $queueAnimal->setName(‘animal.queue‘);
    // 聲明隊列
    $queueRed->declareQueue();
    $queueAnimal->declareQueue();
    // 為隊列綁定主題詞, *: 匹配0或多個任意主題詞, #: 1個任意主題詞. 綁定主題詞時, 主題詞出現的位置也要匹配
    $queueRed->bind(‘test.topic‘, "*.red");
    $queueAnimal->bind(‘test.topic‘, ‘animal.#‘);
    
    echo ‘red queue: ‘ . PHP_EOL;
    while($msgEnvelope = $queueRed->get(AMQP_AUTOACK))
    {
        $msg = $msgEnvelope->getBody();
        echo $msg . PHP_EOL;
    }
    
    echo ‘animal queue: ‘ . PHP_EOL;
    while($msgEnvelope = $queueAnimal->get(AMQP_AUTOACK))
    {
        $msg = $msgEnvelope->getBody();
        echo $msg . PHP_EOL;
    }
    
    $connect->disconnect();
}catch(Exception $e)
{
    echo $e->getMessage();
}

運行結果:

技術分享圖片

red queue 獲得了所有包含 red 主題詞的消息, animal queue 僅獲得包含 animal 主題的消息.

Fanout Exchange

該模式下的交換機是廣播模式, 交換機會向所有綁定的隊列分發消息, 不需要設置交換機和隊列的 routing key. 即使設置了, 也會被忽略.

示例

producer.php

 1 header(‘Content-Type: text/html; charset=utf-8‘);
 2 // 連接設置
 3 $conConfig = [
 4     ‘host‘ => ‘127.0.0.1‘,
 5     ‘port‘ => 5672,
 6     ‘login‘ => ‘root‘,
 7     ‘password‘ => ‘root‘,
 8     ‘vhost‘ => ‘/‘
 9 ];
10 
11 try
12 {
13     // RabbitMQ 連接實例
14     $con = new AMQPConnection($conConfig);
15     // 發起連接
16     $con->connect();
17     // 判斷連接結果,true成功,false失敗
18     if(!$con->isConnected())
19     {
20         echo ‘連接失敗‘;die;
21     }
22     // 新建通道
23     $channel = new AMQPChannel($con);
24     // 在指定通道上新建交換機,如果新建的交換機與已存在交換機重名但屬性不同,會產生406錯誤。如果新建交換機與已存在交換機完全相同,不會有任何影響
25     $exchange = new AMQPExchange($channel);
26     // 交換機名稱
27     $exchange->setName(‘test.fanout‘);
28     // 交換機類型:廣播模式
29     $exchange->setType(‘fanout‘);
30     // 聲明交換機
31     $exchange->declareExchange();
32     
33     for($i = 0; $i < 3; $i++)
34     {
35         $msg = ‘消息‘;
36         // 發送消息到交換機,Fanout模式下不需要指定routing key。即使指定也會被忽略
37         $state = $exchange->publish($msg . $i);
38         if($state)
39         {
40             echo ‘Success‘ . PHP_EOL;
41         }else
42         {
43             echo ‘Fail‘ . PHP_EOL;
44         }
45     }
46     
47     $con->disconnect();
48 }
49 catch(Exception $e)
50 {
51     echo $e->getMessage();
52 }

consumer.php

 1 header(‘Content-Type: text/html; charset=utf-8‘);
 2 // 連接設置
 3 $conConfig = [
 4     ‘host‘ => ‘127.0.0.1‘,
 5     ‘port‘ => 5672,
 6     ‘login‘ => ‘root‘,
 7     ‘password‘ => ‘root‘,
 8     ‘vhost‘ => ‘/‘
 9 ];
10 
11 try
12 {
13     $connect = new AMQPConnection($conConfig);
14     $connect->connect();
15     if(!$connect->isConnected())
16     {
17         echo ‘連接失敗‘;die;
18     }
19     
20     $channel = new AMQPChannel($connect);
21     
22     $exchange = new AMQPExchange($channel);
23     $exchange->setName(‘test.fanout‘);
24     $exchange->setType(‘fanout‘);
25     $exchange->declareExchange();
26     
27     // 創建隊列
28     $queue1 = new AMQPQueue($channel);
29     $queue2 = new AMQPQueue($channel);
30     
31     // 隊列名稱
32     $queue1->setName(‘queue1‘);
33     $queue2->setName(‘queue2‘);
34     // 聲明隊列
35     $queue1->declareQueue();
36     $queue2->declareQueue();
37     // 綁定隊列到交換機。Fanout模式下不需要指定routing key,即使指定也會被忽略
38     $queue1->bind(‘test.fanout‘);
39     echo ‘隊列1: ‘ . PHP_EOL;
40     while($msgEnvelope = $queue1->get(AMQP_AUTOACK))
41     {
42         $msg = $msgEnvelope->getBody();
43         echo $msg . PHP_EOL;
44     }
45     
46     
47     echo PHP_EOL . ‘隊列2: ‘ . PHP_EOL;
48     $queue2->bind(‘test.fanout‘);
49     while($msgEnvelope = $queue2->get(AMQP_AUTOACK))
50     {
51         $msg = $msgEnvelope->getBody();
52         echo $msg . PHP_EOL;
53     }
54     $connect->disconnect();
55 }catch(Exception $e)
56 {
57     echo $e->getMessage();
58 }

運行結果:

技術分享圖片

兩個隊列獲得相同的消息.

RabbitMQ 在 PHP 下的簡單使用 (二) -- Topic Exchange 和 Fanout Exchange