1. 程式人生 > >RabbitMQ 訊息佇列 - topic 模式分發訊息

RabbitMQ 訊息佇列 - topic 模式分發訊息

推薦閱讀
https://blog.csdn.net/column/details/15500.html

topic 模式

根據 Binding 指定的 RoutingKey, Exchange 對 key 進行模式匹配後投遞到相應的 Queue, 模式匹配時符號 "#" 匹配一個或多個詞, 符號 "*" 匹配正好一個詞, 而且單詞與單詞之間必須要用 "." 符號進行分隔


p_topic.php

<?php
//配置資訊
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672'
, 'login' => 'admin', 'password' => 'admin', 'vhost'=>'/' ); $exchangeName = 'test_topic'; //交換機名 $routeKey1 = 'test.key.one'; //路由key1 $routeKey2 = 'test.key.two'; //路由key2 $routeKey3 = 'test.key.three'; //路由key3 $message1 = 'route_key = '.$routeKey1; $message2 = 'route_key = '.$routeKey2
; $message3 = 'route_key = '.$routeKey3; //建立連線和channel $conn = new AMQPConnection($conn_args); $conn->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($conn); //建立交換機物件 $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); $exchange
->setFlags(AMQP_DURABLE); $exchange->declareExchange(); //傳送訊息 echo "Send Message:".$exchange->publish($message1, $routeKey1)."\n"; echo "Send Message:".$exchange->publish($message2, $routeKey2)."\n"; echo "Send Message:".$exchange->publish($message3, $routeKey3)."\n"; $conn->disconnect(); ?>

c_topic_1.php

<?php
//配置資訊
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'admin',
    'password' => 'admin',
    'vhost'=>'/'
);
$exchangeName = 'test_topic'; //交換機名
$queueName = 'test_topic_1'; //佇列名
$routeKey = 'test.#'; //路由key

//建立連線和channel
$conn = new AMQPConnection($conn_args);
$conn->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($conn);

//建立交換機物件
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);

echo "Message:\n";
while(True){
    $queue->consume('processMessage');
    //自動ACK應答
    //$queue->consume('processMessage', AMQP_AUTOACK);
}

$conn->disconnect();

/*
 * 消費回撥函式
 * 處理訊息
 */
function processMessage($envelope, $q) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理訊息
    $q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}
?>

c_topic_2.php

<?php
//配置資訊
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'admin',
    'password' => 'admin',
    'vhost'=>'/'
);
$exchangeName = 'test_topic'; //交換機名
$queueName = 'test_topic_2'; //佇列名
$routeKey = 'test.*.one'; //路由key

//建立連線和channel
$conn = new AMQPConnection($conn_args);
$conn->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($conn);

//建立交換機物件
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);

echo "Message:\n";
while(True){
    $queue->consume('processMessage');
    //自動ACK應答
    //$queue->consume('processMessage', AMQP_AUTOACK);
}

$conn->disconnect();

/*
 * 消費回撥函式
 * 處理訊息
 */
function processMessage($envelope, $q) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理訊息
    $q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}
?>

分別執行 c_topic_1.php c_topic_2.php 等待消費訊息, 然後執行 p_topic.php 生產訊息

# php c_topic_1.php
Message:
route_key = test.key.one
route_key = test.key.two
route_key = test.key.three
# php c_topic_2.php
Message:
route_key = test.key.one
# php p_topic.php
Send Message:1
Send Message:1
Send Message:1