rabbitmq 通過fanout模式將訊息推送到多個佇列
阿新 • • 發佈:2021-01-10
- 使用場景:
有時我們會遇到這樣的情況,多個功能模組都希望得到完整的訊息資料。例如一個log的訊息,一個我們希望輸出在螢幕上實時監控,另外一個使用者持久化日誌。這時就可以使用fanout模式。fanout模式模式不像direct模式通過routingkey來進行匹配,而是會把訊息傳送到所以的已經繫結的佇列中。
- 消費者
<?php // 生產者 p_fanout.php //配置資訊 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_fanout'; //建立連線和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); // 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟 $exchange->setFlags(AMQP_DURABLE); // 宣告交換機 $exchange->declareExchange(); // 向伺服器佇列推送10條訊息 for ($i = 0; $i < 10; $i++) { $msg = 'hello world ' . $i; $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]); }
- 消費者c1_fanout
<?php // 消費者 c1_fanout.php //配置資訊 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_fanout'; $queueName = 'q_fanout_1'; $routeKey = ''; //建立連線和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); // 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟 $exchange->setFlags(AMQP_DURABLE); // 宣告交換機 $exchange->declareExchange(); // 建立訊息佇列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 設定永續性 $queue->setFlags(AMQP_DURABLE); // 宣告訊息佇列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收訊息並處理回撥 // $queue->consume('receive'); //阻塞模式接收訊息 echo "Message:\n"; while(True){ $queue->consume('receive'); //$queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } // 處理回撥的方法 function receive($envelop, $queue){ echo $envelop->getBody() . "\n"; // ACK 通知生產者任務完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
- 消費者c2_fanout
<?php // 消費者 c2_fanout.php //配置資訊 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_fanout'; $queueName = 'q_fanout_2'; $routeKey = ''; //建立連線和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); // 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟 $exchange->setFlags(AMQP_DURABLE); // 宣告交換機 $exchange->declareExchange(); // 建立訊息佇列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 設定永續性 $queue->setFlags(AMQP_DURABLE); // 宣告訊息佇列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收訊息並處理回撥 // $queue->consume('receive'); //阻塞模式接收訊息 echo "Message:\n"; while(True){ $queue->consume('receive'); //$queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } // 處理回撥的方法 function receive($envelop, $queue){ echo $envelop->getBody() . "\n"; // ACK 通知生產者任務完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
- 執行結果
- 三個檔案的執行順序
先啟動兩個消費者,最後啟動生產者,原因是fanout模式下,生產者不會建立訊息佇列,如果消費者沒有建立,則訊息沒有佇列可放。