PHP使用RabbitMQ
基本概念
Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列。 Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。 Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。 Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。 vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。 producer:消息生產者,就是投遞消息的程序。 consumer:消息消費者,就是接受消息的程序。 channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。 消息持久 1) 將交換機置為可持久; 2) 將通道置為可持久 3) 消息發送時設置可持久。 當我們“生產”了一條可持久化的消息,嘗試中斷MQ服務,啟動消費者獲取消息,消息依然能夠恢復。相反,則拋出異常。
使用過程
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。 (3)客戶端聲明一個queue,並設置相關屬性。 (4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。 (5)客戶端投遞消息到exchange。 exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。 exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那麽客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.
基本例子
amqp_task.php
‘192.168.6.30‘,
‘port‘=>‘5672‘,
‘login‘=>‘dpjia‘,
‘password‘=>‘dpjia‘,
‘vhost‘=>‘/‘
);
//創建連接
$conn = new AMQPConnection($conn_args);
if($conn->connect()){
echo "連接成功\n";
}else{
die("連接失敗\n");
}
//創建channel
$channel = new AMQPChannel($conn);
//創建exchange交換機
$ex_name = ‘ex_dpjia‘;
$ex = new AMQPExchange($channel);
$ex->setName($ex_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
echo "Exchange status:".$ex->declareExchange()."\n";
$message = json_encode(array(‘Hello World3!‘,‘php3‘,‘c++3:‘));
$routing_key = ‘‘;
for($i=0;$i<10;$i++){
if($routing_key == ‘key‘){
$routing_key = ‘key2‘;
}else{
$routing_key = ‘key‘;
}
$ex->publish($message,$routing_key);
}
$qu_name = ‘qu_dpjia3‘;
$qu = new AMQPQueue($channel);
$qu->setName($qu_name);
$qu->setFlags(AMQP_DURABLE);
echo ‘queue status:‘.$qu->declare()."\n";
echo ‘queue bind: ‘.$qu->bind($ex_name,‘route.key‘)."\n";
$channel->startTransaction();
echo "send: ".$ex->publish($message, ‘route.key‘); //將你的消息通過制定routingKey發送
$channel->commitTransaction();
$conn->disconnect();
//發送消息
//echo "Send Message:".$ex->publish("TEST MESSAGE,key_1 by xust" . date(‘H:i:s‘, time()), ‘key_1‘)."\n";
//echo "Send Message:".$ex->publish("TEST MESSAGE,key_2 by xust" . date(‘H:i:s‘, time()), ‘key_2‘)."\n";
?>
amqp_worker.php
‘192.168.6.30‘,
‘port‘=>‘5672‘,
‘login‘=>‘dpjia‘,
‘password‘=>‘dpjia‘,
‘vhost‘=>‘/‘
);
//創建連接
$conn = new AMQPConnection($conn_args);
if($conn->connect()){
echo "連接成功\n";
}else{
die("連接失敗\n");
}
$ex_name = ‘ex_dpjia‘;
$qu_name = ‘qu_dpjia‘;
$channel = new AMQPChannel($conn);
/*
$ex = new AMQPExchange($channel);
$ex->setName($ex_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
echo "Exchange status:".$ex->declare()."\n";
*/
//創建隊列 queue
$qu = new AMQPQueue($channel);
$qu->setName($qu_name);
$qu->setFlags(AMQP_DURABLE);
echo $qu->declare();
//綁定交換機和隊列
$qu->bind($ex_name,‘roukey2‘);
//阻塞模式接收消息
//echo "Message:\n";
//$qu->consume(‘processMessage‘,AMQP_AUTOACK); //自動ACK應答
$msg = $qu->get(AMQP_AUTOACK);
if($msg){
var_dump($msg->getBody());
}
$conn->disconnect();
//消息回調函數 processMessage
function processMessage($envelope,$queue){
var_dump($envelope->getRoutingKey);
$msg = $envelope->getBody();
echo $msg."\n";
}
?>
PHP使用RabbitMQ