1. 程式人生 > 實用技巧 >rabbitmq 通過fanout模式將訊息推送到多個佇列

rabbitmq 通過fanout模式將訊息推送到多個佇列

  • 使用場景:

有時我們會遇到這樣的情況,多個功能模組都希望得到完整的訊息資料。例如一個log的訊息,一個我們希望輸出在螢幕上實時監控,另外一個使用者持久化日誌。這時就可以使用fanout模式。fanout模式模式不像direct模式通過routingkey來進行匹配,而是會把訊息傳送到所以的已經繫結的佇列中。

  • 消費者
<?php
// 生產者 p_fanout.php
//配置資訊
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_fanout';
 
//建立連線和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);

// 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟
$exchange->setFlags(AMQP_DURABLE);

// 宣告交換機
$exchange->declareExchange();

// 向伺服器佇列推送10條訊息
for ($i = 0; $i < 10; $i++) {
    $msg = 'hello world ' . $i;
    $exchange->publish($msg, 
    $routeKey, 
    AMQP_NOPARAM, 
    ['delivery_mode' => 2]);
}

  • 消費者c1_fanout
<?php
// 消費者 c1_fanout.php
//配置資訊
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_fanout';
$queueName    = 'q_fanout_1';
$routeKey     = '';
 
//建立連線和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);

// 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟
$exchange->setFlags(AMQP_DURABLE);

// 宣告交換機
$exchange->declareExchange();

// 建立訊息佇列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 設定永續性
$queue->setFlags(AMQP_DURABLE);

// 宣告訊息佇列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收訊息並處理回撥
// $queue->consume('receive');

//阻塞模式接收訊息
echo "Message:\n";
while(True){
    $queue->consume('receive');
    //$queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}

// 處理回撥的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";

    // ACK 通知生產者任務完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

  • 消費者c2_fanout
<?php
// 消費者 c2_fanout.php
//配置資訊
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_fanout';
$queueName    = 'q_fanout_2';
$routeKey     = '';
 
//建立連線和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);

// 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟
$exchange->setFlags(AMQP_DURABLE);

// 宣告交換機
$exchange->declareExchange();

// 建立訊息佇列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 設定永續性
$queue->setFlags(AMQP_DURABLE);

// 宣告訊息佇列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收訊息並處理回撥
// $queue->consume('receive');
//阻塞模式接收訊息
echo "Message:\n";
while(True){
    $queue->consume('receive');
    //$queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}

// 處理回撥的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";

    // ACK 通知生產者任務完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

  • 執行結果

  •  三個檔案的執行順序

  先啟動兩個消費者,最後啟動生產者,原因是fanout模式下,生產者不會建立訊息佇列,如果消費者沒有建立,則訊息沒有佇列可放。