1. 程式人生 > >Rabbitmq學習(一) Rabbitmq初探

Rabbitmq學習(一) Rabbitmq初探

Rabbitmq學習(一) Rabbitmq初探

理論
定義
訊息佇列:在訊息的傳輸過程中儲存訊息的的容器。

這是一個較為經典的消費-生產者模型,說起來比較抽象,打個比方:A執行緒需要給B執行緒傳送訊息(A、B執行緒不一定是在同一臺機器上的),A執行緒先把訊息傳送到訊息佇列伺服器上,然後B執行緒去讀取或是訂閱訊息伺服器上訊息佇列中的訊息,執行緒A和B之間並沒有進行直接通訊。MQ伺服器在中間起到中繼的作用。

適用的應用場景
比較適合非同步傳輸,這裡解釋一下什麼是非同步和同步。

非同步:傳送方不關心訊息有沒有傳送成功,只發送訊息,不去獲取訊息是否傳送成功。

同步:傳送方關心訊息是否傳送成功,傳送訊息後,會等待接收方返回狀態碼,根據狀態碼來判斷是否傳送成功,然後執行相對於的動作。

下邊以Http中的同步和非同步為例:

如:普通的B/S架構客戶端和伺服器端之間的通訊就是同步的,即提交請求 ---> 等待伺服器處理完畢返回訊息 ---> 拿到伺服器返回的訊息,處理完畢。

如:Ajax技術就是非同步的,請求通過事件觸發 ---> 伺服器處理(瀏覽器不用等待,仍可以做其他的事情) ---> 處理完畢。

有人可能會好奇說應用場景怎麼說到了同步和非同步,那說明你還不是很理解技術和應用場景之間的緊密聯絡。


工作過程

生產者客戶端:
客戶端連線到RabbitMQ伺服器上,開啟一個訊息通道(channel);
客戶端宣告一個訊息交換機(exchange),並設定相關屬性。
客戶端宣告一個訊息佇列(queue),並設定相關屬性。
客戶端使用routing key在訊息交換機(exchange)和訊息佇列(queue)中建立好繫結關係。
客戶端投遞訊息都訊息交換機(exchange)上
客戶端關閉訊息通道(channel)以及和伺服器的連線。
伺服器端:
exchange接收到訊息後,根據訊息的key(這個key的產生規則暫時沒研究,有知道的小夥伴可以留言告訴我)和以及設定的binding,進行訊息路由,將訊息投遞到一個或多個訊息佇列中。

安裝
由於rabbitMq需要erlang語言的支援,在安裝rabbitMq之前需要安裝erlang,執行命令:

sudo apt-get install erlang
Rabbitmq學習(一) Rabbitmq初探

這樣就安裝完了。
接下來,安裝rabbitMq:
sudo apt-get install rabbitmq-server

安裝完之後啟動rabbitMQ

Rabbitmq學習(一) Rabbitmq初探

建立使用者
sudo rabbitmqctl add_user lsl 123456
Rabbitmq學習(一) Rabbitmq初探

將使用者設定為管理員(只有管理員才能遠端登入)

sudo rabbitmqctl set_user_tags 使用者名稱 administrator

同時為使用者設定讀寫等許可權
sudo rabbitmqctl set_permissions -p / lsl ".*" ".*" ".*"

跟著訪問 http://192.168.136.130:15672/
就能看到配置頁面了,這也說明已經成功安裝rabbitmq
Rabbitmq學習(一) Rabbitmq初探

實戰消費者和生產者(PHP版本
生產者:生產訊息,傳送訊息。類似工廠。
消費者:接受訊息,使用訊息。類似顧客。
佇列:儲存訊息。類似倉庫、中轉站。佇列可以儲存很多的訊息,因為它基本上是一個無限制的緩衝區,前提是你的機器有足夠的儲存空間。多個生產者可以將訊息傳送到同一個佇列中,多個消費者也可以只從同一個佇列接收資料。這就是佇列的特性。

下面寫一個demo來實現rabbitmq的消費者和生產者
send.php

<?php
//配置資訊 
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'lsl',
    'password' => '123456',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機名 
//$q_name = 'q_linvo'; //無需佇列名 
$k_route = 'key_1'; //路由key 

//建立連線和channel 
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//建立交換機物件    
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
date_default_timezone_set("Asia/Shanghai");
//傳送訊息 
//$channel->startTransaction(); //開始事務  
for($i=0; $i<5; ++$i){
    sleep(2);//每個兩秒傳送一條訊息
    //訊息內容 
    $message = "HelloWorld!".date("h:i:sa");
    echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事務 

$conn->disconnect();
?>

rec.php

<?php
//配置資訊 
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'lsl',
    'password' => '123456',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機名 
$q_name = 'q_linvo'; //佇列名 
$k_route = 'key_1'; //路由key 

//建立連線和channel 
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//建立交換機    
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct型別  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";

//建立佇列    
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化  
echo "Message Total:".$q->declare()."\n";

//繫結交換機與佇列,並指定路由鍵 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收訊息 
echo "Message:\n";
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答  
}
$conn->disconnect();

先讓消費者接收訊息
Rabbitmq學習(一) Rabbitmq初探

再呼叫生產者傳送訊息
Rabbitmq學習(一) Rabbitmq初探

再檢視消費者收到的訊息
Rabbitmq學習(一) Rabbitmq初探

假設消費者掛掉了,看看訊息是怎麼樣的
Rabbitmq學習(一) Rabbitmq初探
看一下之前部署的,你會發現linvo這個佇列裡有5條訊息,這意味著沒有消費者去讀取它,把訊息堆積在佇列裡了

當你開啟消費者,頁面的Total值就馬上變為0了,這意味著訊息已經被接收。
這樣就模擬了佇列對訊息的處理。