1. 程式人生 > >PHP使用RabbitMQ

PHP使用RabbitMQ

rod ask pass tran test color 模式匹配 echo setname

基本概念

Broker:簡單來說就是消息隊列服務器實體。

  Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列。   Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。   Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。   Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。   vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。   producer:消息生產者,就是投遞消息的程序。   consumer:消息消費者,就是接受消息的程序。   channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。 消息持久 1) 將交換機置為可持久; 2) 將通道置為可持久 3) 消息發送時設置可持久。 當我們“生產”了一條可持久化的消息,嘗試中斷MQ服務,啟動消費者獲取消息,消息依然能夠恢復。相反,則拋出異常。

使用過程

消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。

  (2)客戶端聲明一個exchange,並設置相關屬性。   (3)客戶端聲明一個queue,並設置相關屬性。   (4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。   (5)客戶端投遞消息到exchange。 exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。 exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那麽客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.

”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。 RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:   (1)exchange持久化,在聲明時指定durable => 1   (2)queue持久化,在聲明時指定durable => 1   (3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那麽它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

基本例子

amqp_task.php

 ‘192.168.6.30‘,
    ‘port‘=>‘5672‘,
    ‘login‘=>‘dpjia‘,
    ‘password‘=>‘dpjia‘,
    ‘vhost‘=>‘/‘
);
//創建連接
$conn = new AMQPConnection($conn_args);
if($conn->connect()){
    echo "連接成功\n";
}else{
    die("連接失敗\n");
}
//創建channel
$channel = new AMQPChannel($conn);
//創建exchange交換機
$ex_name = ‘ex_dpjia‘;
$ex = new AMQPExchange($channel);
$ex->setName($ex_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
echo "Exchange status:".$ex->declareExchange()."\n";
$message = json_encode(array(‘Hello World3!‘,‘php3‘,‘c++3:‘));

$routing_key = ‘‘;
for($i=0;$i<10;$i++){
    if($routing_key == ‘key‘){
        $routing_key = ‘key2‘;
    }else{
        $routing_key = ‘key‘;
    }
    $ex->publish($message,$routing_key);
}

$qu_name = ‘qu_dpjia3‘;
$qu = new AMQPQueue($channel);
$qu->setName($qu_name);
$qu->setFlags(AMQP_DURABLE);
echo ‘queue status:‘.$qu->declare()."\n";

echo ‘queue bind: ‘.$qu->bind($ex_name,‘route.key‘)."\n";

$channel->startTransaction();
echo "send: ".$ex->publish($message, ‘route.key‘); //將你的消息通過制定routingKey發送
$channel->commitTransaction();
$conn->disconnect();
//發送消息
//echo "Send Message:".$ex->publish("TEST MESSAGE,key_1 by xust" . date(‘H:i:s‘, time()), ‘key_1‘)."\n";
//echo "Send Message:".$ex->publish("TEST MESSAGE,key_2 by xust" . date(‘H:i:s‘, time()), ‘key_2‘)."\n";
?>

amqp_worker.php

 ‘192.168.6.30‘,
        ‘port‘=>‘5672‘,
        ‘login‘=>‘dpjia‘,
        ‘password‘=>‘dpjia‘,
        ‘vhost‘=>‘/‘
);
//創建連接
$conn = new AMQPConnection($conn_args);
if($conn->connect()){
        echo "連接成功\n";
}else{
        die("連接失敗\n");
}
$ex_name = ‘ex_dpjia‘;
$qu_name = ‘qu_dpjia‘;

$channel = new AMQPChannel($conn);
/*
$ex = new AMQPExchange($channel);
$ex->setName($ex_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
echo "Exchange status:".$ex->declare()."\n";
*/
//創建隊列 queue

$qu = new AMQPQueue($channel);
$qu->setName($qu_name);
$qu->setFlags(AMQP_DURABLE);
echo $qu->declare();
//綁定交換機和隊列
$qu->bind($ex_name,‘roukey2‘);

//阻塞模式接收消息
//echo "Message:\n";
//$qu->consume(‘processMessage‘,AMQP_AUTOACK); //自動ACK應答
$msg = $qu->get(AMQP_AUTOACK);
if($msg){
        var_dump($msg->getBody());
}

$conn->disconnect();

//消息回調函數 processMessage

function processMessage($envelope,$queue){
        var_dump($envelope->getRoutingKey);
        $msg = $envelope->getBody();
        echo $msg."\n";
}

?>

PHP使用RabbitMQ