Rabbitmq學習(一) Rabbitmq初探
Rabbitmq學習(一) Rabbitmq初探
理論
定義
訊息佇列:在訊息的傳輸過程中儲存訊息的的容器。
這是一個較為經典的消費-生產者模型,說起來比較抽象,打個比方:A執行緒需要給B執行緒傳送訊息(A、B執行緒不一定是在同一臺機器上的),A執行緒先把訊息傳送到訊息佇列伺服器上,然後B執行緒去讀取或是訂閱訊息伺服器上訊息佇列中的訊息,執行緒A和B之間並沒有進行直接通訊。MQ伺服器在中間起到中繼的作用。
適用的應用場景
比較適合非同步傳輸,這裡解釋一下什麼是非同步和同步。
非同步:傳送方不關心訊息有沒有傳送成功,只發送訊息,不去獲取訊息是否傳送成功。
同步:傳送方關心訊息是否傳送成功,傳送訊息後,會等待接收方返回狀態碼,根據狀態碼來判斷是否傳送成功,然後執行相對於的動作。
下邊以Http中的同步和非同步為例:
如:普通的B/S架構客戶端和伺服器端之間的通訊就是同步的,即提交請求 ---> 等待伺服器處理完畢返回訊息 ---> 拿到伺服器返回的訊息,處理完畢。
如:Ajax技術就是非同步的,請求通過事件觸發 ---> 伺服器處理(瀏覽器不用等待,仍可以做其他的事情) ---> 處理完畢。
有人可能會好奇說應用場景怎麼說到了同步和非同步,那說明你還不是很理解技術和應用場景之間的緊密聯絡。
工作過程
生產者客戶端:
客戶端連線到RabbitMQ伺服器上,開啟一個訊息通道(channel);
客戶端宣告一個訊息交換機(exchange),並設定相關屬性。
客戶端宣告一個訊息佇列(queue),並設定相關屬性。
客戶端使用routing key在訊息交換機(exchange)和訊息佇列(queue)中建立好繫結關係。
客戶端投遞訊息都訊息交換機(exchange)上
客戶端關閉訊息通道(channel)以及和伺服器的連線。
伺服器端:
exchange接收到訊息後,根據訊息的key(這個key的產生規則暫時沒研究,有知道的小夥伴可以留言告訴我)和以及設定的binding,進行訊息路由,將訊息投遞到一個或多個訊息佇列中。
安裝
由於rabbitMq需要erlang語言的支援,在安裝rabbitMq之前需要安裝erlang,執行命令:
sudo apt-get install erlang
這樣就安裝完了。
接下來,安裝rabbitMq:sudo apt-get install rabbitmq-server
安裝完之後啟動rabbitMQ
建立使用者sudo rabbitmqctl add_user lsl 123456
將使用者設定為管理員(只有管理員才能遠端登入)
sudo rabbitmqctl set_user_tags 使用者名稱 administrator
同時為使用者設定讀寫等許可權sudo rabbitmqctl set_permissions -p / lsl ".*" ".*" ".*"
跟著訪問 http://192.168.136.130:15672/
就能看到配置頁面了,這也說明已經成功安裝rabbitmq
實戰消費者和生產者(PHP版本)
生產者:生產訊息,傳送訊息。類似工廠。
消費者:接受訊息,使用訊息。類似顧客。
佇列:儲存訊息。類似倉庫、中轉站。佇列可以儲存很多的訊息,因為它基本上是一個無限制的緩衝區,前提是你的機器有足夠的儲存空間。多個生產者可以將訊息傳送到同一個佇列中,多個消費者也可以只從同一個佇列接收資料。這就是佇列的特性。
下面寫一個demo來實現rabbitmq的消費者和生產者
send.php
<?php
//配置資訊
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'lsl',
'password' => '123456',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機名
//$q_name = 'q_linvo'; //無需佇列名
$k_route = 'key_1'; //路由key
//建立連線和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//建立交換機物件
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
date_default_timezone_set("Asia/Shanghai");
//傳送訊息
//$channel->startTransaction(); //開始事務
for($i=0; $i<5; ++$i){
sleep(2);//每個兩秒傳送一條訊息
//訊息內容
$message = "HelloWorld!".date("h:i:sa");
echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事務
$conn->disconnect();
?>
rec.php
<?php
//配置資訊
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'lsl',
'password' => '123456',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機名
$q_name = 'q_linvo'; //佇列名
$k_route = 'key_1'; //路由key
//建立連線和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//建立交換機
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct型別
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";
//建立佇列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";
//繫結交換機與佇列,並指定路由鍵
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";
//阻塞模式接收訊息
echo "Message:\n";
while(True){
$q->consume('processMessage');
//$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
}
$conn->disconnect();
先讓消費者接收訊息
再呼叫生產者傳送訊息
再檢視消費者收到的訊息
假設消費者掛掉了,看看訊息是怎麼樣的
看一下之前部署的,你會發現linvo這個佇列裡有5條訊息,這意味著沒有消費者去讀取它,把訊息堆積在佇列裡了
當你開啟消費者,頁面的Total值就馬上變為0了,這意味著訊息已經被接收。
這樣就模擬了佇列對訊息的處理。