1. 程式人生 > >電商之梳理rabbitmq相關知識---通訊--訊息佇列

電商之梳理rabbitmq相關知識---通訊--訊息佇列

rabbitmq 編輯
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ。
中文名 訊息佇列 外文名 Message Queue 簡 稱 MQ
目錄
1 MQ特點
2 使用場景
3 含義
4 安裝
5 客戶端
6 消費者端
7 幾個概念
8 訊息持久
9 入門介紹
這裡寫圖片描述


▪ 基本概念
▪ 應用實際
▪ 叢集配置
MQ特點編輯
MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中介軟體服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。
使用場景編輯
在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。
含義編輯
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業訊息系統。他遵循Mozilla Public License開源協議。
安裝編輯
4.1)安裝ERLANG
首先,因為RabbitMQ由ERLANG實現,下載ERLANG 原始碼。
解壓原始碼至ERLANG至資料夾E
RLANGYuminstalltkYuminstalltclYuminstallunixODBC
ERLANG.編譯ERLANG
./configure –prefix=/usr/local/erlang
./make
./make install
並將erlang bin目錄加至PATH
4.2)安裝RabbitMQ
下載RabbitMQ ,解壓至$RMQ。
啟動RabbitMQ
./bin/rabbitmq-server
客戶端編輯
import com.rabbitmq.client.Channel;
  import com.rabbitmq.client.Connection;
  import com.rabbitmq.client.ConnectionFactory;
public class Send {
  private final static String QUEUE_NAME = “hello”;
public static void main(String[] args) throws.IOException{
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(“localhost”);
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  String message = “Hello World!”;
  channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());
  System.out.println(” [x] Sent ‘” + message + “’”);
  channel.close();
  connection.close();
  }
  }
消費者端編輯
public class RabbitMQRecv {
  private final static String QUEUE_NAME = “hello”;
public static void main(String avg[]) throws.IOException,
  java.lang.InterruptedException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(“localhost”);
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  System.out.println(” [*] Waiting for messages. To exit press CTRL+C”);
  QueueingConsumer consumer = new QueueingConsumer(channel);
  channel.basicConsume(QUEUE_NAME, true, consumer);
  while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  System.out.println(” [x] Received ‘” + message + “’”);
  }
  }
  }
幾個概念編輯
Exchange:交換機,決定了訊息路由規則;
Queue:訊息佇列;
Channel:進行訊息讀寫的通道;
Bind:綁定了Queue和Exchange,意即為符合什麼樣路由規則的訊息,將會放置入哪一個訊息佇列;
訊息持久編輯
1) 將交換機置為可持久;
2) 將通道置為可持久
3) 訊息傳送時設定可持久。
當我們“生產”了一條可持久化的訊息,嘗試中斷MQ服務,啟動消費者獲取訊息,訊息依然能夠恢復。相反,則丟擲異常。
入門介紹編輯
基本概念
RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文件會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹。
RabbitMQ的結構圖如下:

幾個概念說明:
Broker:簡單來說就是訊息佇列伺服器實體。
  Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
  Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
  Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
  Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
  vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
  producer:訊息生產者,就是投遞訊息的程式。
  consumer:訊息消費者,就是接受訊息的程式。
  channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。
訊息佇列的使用過程大概如下:
(1)客戶端連線到訊息佇列伺服器,開啟一個channel。
  (2)客戶端宣告一個exchange,並設定相關屬性。
  (3)客戶端宣告一個queue,並設定相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間建立好繫結關係。
  (5)客戶端投遞訊息到exchange。
exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。
exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。
RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:
  (1)exchange持久化,在宣告時指定durable => 1
  (2)queue持久化,在宣告時指定durable => 1
  (3)訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。
應用實際
使用Linux伺服器(ubuntu 9.10 64位),安裝RabbitMQ非常方便。
先執行如下命令安裝erlang:
apt-get install erlang-nox
下載RabbitMQ的安裝包,如下安裝:
dpkg -i rabbitmq-server_2.6.1-1_all.deb
安裝完後,使用
/etc/init.d/rabbitmq-server start|stop|restart
來啟動、停止、重啟rabbitmq。
在正式應用之前,先在RabbitMQ裡建立一個vhost,加一個使用者,並設定該使用者的許可權。
使用rabbitmqctl客戶端工具,在根目錄下建立”/pyhtest”這個vhost:
rabbitmqctl add_vhost /pyhtest
建立一個使用者名稱”pyh”,設定密碼”pyh1234″:
rabbitmqctl add_user pyh pyh1234
設定pyh使用者對/pyhtest這個vhost擁有全部許可權:
rabbitmqctl set_permissions -p /pyhtest pyh “.” “.” “.*”
後面三個”*”代表pyh使用者擁有對/pyhtest的配置、寫、讀全部許可權
設定好後,開始程式設計,用Perl寫一個訊息投遞程式(producer):

!/usr/bin/perl

  use strict;
  use Net::RabbitMQ;
  use UUID::Tiny;
my channel = 1000; # channel ID,可以隨意指定,只要不衝突  
  my
queuename = “pyh_queue”; # 佇列名
  my exchange = “pyh_exchange”; # 交換機名  
  my
routing_key = “test”; # routing key
my mq = Net::RabbitMQ->new(); # 建立一個RabbitMQ物件mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立連線
  mq>channelopen(channel); # 開啟一個channel
  mq>exchangedeclare(channel, exchange, {durable => 1}); # 宣告一個持久化的交換機mq->queue_declare(channel,queuename, {durable => 1}); # 宣告一個持久化的佇列
  mq>queuebind(channel, queuename,exchange, routing_key); # 使用routing key在交換機和佇列間建立繫結  
for (my
i=0;i<10000000;i++) { # 迴圈1000萬次
  my string = create_UUID_as_string(UUID_V1); # 產生一條UUID作為訊息主體mq->publish(channel,routing_key, string, { exchange => $exchange }, { delivery_mode => 2 }); # 將訊息結合key以持久化模式投遞到交換機  
  }
mq->disconnect(); # 斷開連線
訊息接受程式(consumer)大概如下:

!/usr/bin/perl

  use strict;
  use Net::RabbitMQ;
my channel=1001;myqueuename = “pyh_queue”;
  my mq=Net::RabbitMQ>new();mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ });
  mq>channelopen(channel);
while (1) {
  my hashref=mq->get(channel,queuename);
  last unless defined hashref;printhashref->{message_count}, “: “, hashref->{body},”\n”;  
  }
mq->disconnect();
consumer連線後只要指定佇列就可獲取到訊息。
上述程式共投遞1000萬條訊息,每條訊息36位元組(UUID),開啟持久化,共耗時17分多鐘(包括產生UUID的時間),每秒投遞訊息約9500條。測試機器是8G記憶體、8核志強CPU。
投遞完後,在/var/lib/rabbitmq/mnesia/[email protected]${hostname}/msg_store_persistent目錄,產生2G多的持久化訊息資料。在執行consumer程式後,這些資料都會消失,因為訊息已經被消費了。
叢集配置
單機環境下的叢集配置:
首先啟動兩個例項,rab和rab2,埠分別為9991和9992
RABBITMQ_NODE_PORT=9991 RABBITMQ_NODENAME=rab rabbitmq-server -detached
RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached
二:停止rab2,並將其加入cluster叢集中
rabbitmqctl -n rab2 stop_app
rabbitmqctl -n rab2 join_cluster [email protected](@rab,這裡的rab表示主機名)
重新啟動rab2
RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached
檢視叢集的狀態
rabbitmqctl cluster_status -n rab
顯示如下資訊表示叢集配置正常:
Cluster status of node [email protected]
  [{nodes,[{disc,[[email protected],[email protected]]}]},
  {running_nodes,[[email protected]]},
  {partitions,[]}]
  …done.