1. 程式人生 > >初識RabbitMQ,附RabbitMQ+PHP演示例項

初識RabbitMQ,附RabbitMQ+PHP演示例項

RabbitMQ是一個在AMQP基礎上實現的企業級訊息系統。何謂訊息系統,就是訊息佇列系統,訊息佇列是“”消費-生產者模型“”的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。

what?消費-生產者模型?對,沒錯!就是大學作業系統課程裡面的“消費者-生產者模式”,記得當時被這個問題坑的不輕啊。

在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步操作,而這種非同步處理的方式大大的節省了伺服器的請求時間,從而提高了系統的吞吐量。而且不影響伺服器做其他相應,不獨佔伺服器資源。

如:註冊使用者這種服務,它可能解耦成好幾種獨立的服務(賬號驗證,郵箱驗證碼,手機簡訊碼等)。它們作為消費者,等待使用者輸入資料,在前臺資料提交之後會經過分解併發送到各個服務所在的url,分發的那個角色就相當於生產者。消費者在獲取資料時候有可能一次不能處理完,那麼它們各自有一個請求佇列,那就是記憶體緩衝區了。做這項工作的框架叫做訊息佇列。

又比如:電商系統中的訂單處理系統,傳統處理模式是:下訂單的時候,訂單系統可能會呼叫庫存系統的介面,這樣兩個系統之間存在一個嚴重依賴關係,如果庫存系統宕機,那麼整個流程都會受到影響。現在大多公司的處理方法是:引入訊息佇列,下完訂單,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功。

對庫存系統來說,採用拉/推的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作。這樣實現了兩個系統間的解耦。

即使在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入訊息佇列就不再關心其他的後續操作了。

 給一張結構圖:

幾個概念說明: Broker:簡單來說就是訊息佇列伺服器實體。
  Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
  Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
  Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
  Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
  vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
  producer:訊息生產者,就是投遞訊息的程式。
  consumer:訊息消費者,就是接受訊息的程式。
  channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。 訊息佇列的使用過程大概如下: (1)客戶端連線到訊息佇列伺服器,開啟一個channel。
  (2)客戶端宣告一個exchange,並設定相關屬性。
  (3)客戶端宣告一個queue,並設定相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間建立好繫結關係。
  (5)客戶端投遞訊息到exchange。 exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。 exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。 RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:
  (1)exchange持久化,在宣告時指定durable => 1
  (2)queue持久化,在宣告時指定durable => 1
  (3)訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。

好了,講了這麼多基本講清楚了RabbitMQ的應用場景和好處,下面我們在windows平臺上練一把手,更直觀的來看看RabbitMQ到底是什麼?

那麼我蠻來安裝RabbitMQ+PHP環境:

1.安裝RabbitMQ

 安裝RabbitMQ之前首先要安裝Erlang語言開發包,下載地址:http://www.erlang.org/download/otp_win32_R15B.exe 預設安裝即可

   配置環境變數 ERLANG_HOME C:\Program Files (x86)\erl5.9 

      新增到PATH  %ERLANG_HOME%\bin;

 下載安裝RabbitMQ,下載地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.exe 

      配置環境變數 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-2.8.0

      新增到PATH %RABBITMQ_SERVER%\sbin;

  然後到dos裡面切換到RabbitMQ目錄下,執行rabbitmq-plugins.bat enable rabbitmq_management, 安裝完成之後以管理員身份啟動 rabbitmq:輸入命令:

  rabbitmq-service.bat stop   rabbitmq-service.bat install   rabbitmq-service.bat start 然後,瀏覽器中輸入:127.0.0.1:15672,使用者名稱密碼是guest ,如果能登陸就說明安裝成功。

 

2.接下來要安裝php的amqp擴充套件

先用phpinfo()檢視php版本資訊,及,資訊

最後根據上面的資訊去下載相應的amqp版本:http://pecl.php.net/package/amqp

據上面資訊我們的是32位非執行緒安全版本

加壓後:

將php_amqp.dll複製到php/ext,同時在php.ini中新增如下程式碼:

[amqp]  

extension=php_amqp.dll 

然後將rabbitmq.1.dll複製到php根目錄C:/xampp/php/,同時修改apache配置檔案httpd.conf,新增如下程式碼:

# rabbitmq

LoadFile  "C:/xampp/php/rabbitmq.1.dll"  

最後重啟看看是否已經載入了amqp模組:

---------------------------------到這裡為止,安裝已經結束-------------------------------------------------

RabbitMQ+PHP展示例項

新建rabbit_consumer.php作為消費者

<?php 
//配置資訊 
$conn_args = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   
$e_name = 'e_linvo'; //交換機名 
$q_name = 'q_linvo'; //佇列名 
$k_route = 'key_1'; //路由key 
 
//建立連線和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   
$channel = new AMQPChannel($conn);   
 
//建立交換機    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct型別  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";   
   
//建立佇列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
$q->setFlags(AMQP_DURABLE); //持久化  
echo "Message Total:".$q->declare()."\n";   
 
//繫結交換機與佇列,並指定路由鍵 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; 
 
//阻塞模式接收訊息 
echo "Message:\n";   
while(True){ 
    $q->consume('processMessage');   
    //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答  
} 
$conn->disconnect();   
 
/**
 * 消費回撥函式
 * 處理訊息
 */ 
function processMessage($envelope, $queue) { 
    $msg = $envelope->getBody(); 
    echo $msg."\n"; //處理訊息 
    $queue->ack($envelope->getDeliveryTag()); //手動傳送ACK應答 
}
?>
View Code

 新建rabbit_publisher.php作為生產者

<?php
//配置資訊 
$conn_args = array( 
    'host' => '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   
$e_name = 'e_linvo'; //交換機名 
//$q_name = 'q_linvo'; //無需佇列名 
$k_route = 'key_1'; //路由key 
 
//建立連線和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   
$channel = new AMQPChannel($conn);   
 

 
//建立交換機物件    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name);   
date_default_timezone_set("Asia/Shanghai");
//傳送訊息 
//$channel->startTransaction(); //開始事務  
for($i=0; $i<5; ++$i){ 
    sleep(1);//休眠1秒
    //訊息內容 
    $message = "TEST MESSAGE!".date("h:i:sa");   
    echo "Send Message:".$ex->publish($message, $k_route)."\n";  
} 
//$channel->commitTransaction(); //提交事務 
 
$conn->disconnect();
?>
View Code

測試一下:

先起一個視窗同樣切換到php目錄,輸入:php c:/xampp/htdocs/RabbitMQ/rabbit_consumer.php

執行消費者

然後再起一個dos視窗,切換到php根目錄,輸入以下命令:php c:/xampp/htdocs/RabbitMQ/rabbit_publisher.php

執行生產者

消費者接收到訊息

 

 這樣就模擬了佇列對訊息的處理,希望我們通過這篇文章對RabbitMQ的認識都能有一定的提升。

 另外給出官網的php使用指南:http://www.rabbitmq.com/tutorials/tutorial-one-php.html