RabbitMQ服務之入門篇
RabbitMQ是一種訊息中介軟體,能夠很好的處理來自客戶端的非同步訊息傳送及請求,將訊息傳送放入到服務端的佇列池中,而接收端可以根據RabbitMQ配置的轉發機制接收和過濾服務端轉發來的訊息。RabbitMQ可以根據指定的訊息轉發規則進行訊息的轉發、緩衝和持久化操作,這也是其根身立命的地方,但是其誕生的主要目的是為了均衡執行緒耗時操作的壓力,前提是這些操作要滿足沒有要求即時反應,因為其不適合用在要求即時反應的需求,此時可以考慮使用快取中介軟體Redis、Memcache等,另外,RabbitMQ主要用在多伺服器間或單伺服器的子系統間進行通訊,是分散式系統標準的配置。
l 如何安裝
l 基本原理
l 入門例子
一、如何安裝
在Mac中,安裝RabbitMQ服務環境,需要到官網下載最新的.tar.xz檔案並解壓,若要啟動RabbitMQ服務或是使用相關的便捷命令,我們需要進入sbin目錄下,那麼啟動服務如下:
$sudo ./rabbitmq-server start
啟動服務之後,等待便是RabbitMQ佇列接收和轉發訊息了。如果要停止RabbitMQ服務,可以使用如下命令操作:
$sudo ./rabbitmqctl stop_app ---停止服務
$sudo ./rabbitmqctl start_app ---開始服務
二、基本原理
RabbitMQ的原理是根據其轉發器或交換機規則的特性來說的,目前RabbitMQ有四種轉發器型別:fanout、direct、topic及headers,前三種類型轉發器比較常用,而headers用的比較少,原因有兩個,一個是它的用法比較麻煩,另外就是前三者基本可以滿足所有實際需求,可以取代它的存在,不管怎樣,在這裡都會介紹下這四種轉發器的原理,具體如下:
1、fanout exchange
說明:
fanout轉發器,是幾種轉發器中轉發訊息最快的一種,其路由規則會將訊息轉發給與轉發器繫結的每一個佇列中,也就是輪循轉發相同訊息給佇列。
2、direct exchange
說明:
direct轉發器,會根據當前傳送和接受端協商的統一的routing key來完全匹配轉發訊息,也就是轉發器傳送標有routing key標誌的路由資訊,只有接收端的binding key與routing key與之相同,才會接收到資訊。
3、topic exchange
說明:
topic轉發器,相對於direct轉發器,topic可以轉發符合多個條件的訊息,也就是傳送端傳送訊息,而接受端可以靈活配置接收訊息的路由規則,例如:msg.#和msg.*,前者能夠接收msg.log.info和msg.log型別訊息,而後者則能接收到msg.*型別訊息,所以#號代表一個或多個單詞匹配,而*則代表一個單詞匹配了,實際上就是正常的規則過濾機制。
4、headers exchange
說明:
headers轉發器,也是用的比較少的轉發器,原因請檢視第一部分介紹。此種轉發器,忽略了路由routing key規則,使用了健-值對形式匹配規則,此種轉發器規定,在接受端必須使用x-match,它目前有兩種型別:all和any,前者代表所有的鍵-值都滿足後,才能收到資訊,而後者則滿足任意個就可以收到消,這個會在後續文章介紹,這裡只需瞭解即可。
三、入門例子
這裡我們以HelloWorld!程式為例子,來介紹RabbitMQ的使用。專案型別為maven管理的java專案,在介紹RabbitMQ使用時,會封裝一個通用的功能,隨著陸續文章的介紹,會完善該封裝工具的功能,後續不再說明。
1、準備
因為接下來介紹的傳送端和接收訊息端的服務連結是相同的,所以我們建議封裝為BaseConnector.java,供接收端和傳送端繼承使用;傳送和接收訊息的載體MessageInfo.java也是需要的,並且是需要被序列化,另外,在啟動程式前,需要先啟動機器的RabbitMQ服務,否則拒絕訪問連結,具體如下:
A、BaseConnector.java
publicclass BaseConnector {
protectedChannel channel;
protected Connection connection;
protected String queueName;
public BaseConnector(String queueName)throwsIOException, TimeoutException {
this. queueName = queueName;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1 ");
connection = factory.newConnection(); //建立連線
channel = connection.createChannel(); //建立頻道
channel.queueDeclare(queueName, false,false,false,null);//宣告佇列
}
protectedvoid close() { //關閉頻道及連結
try {
channel.close();
connection.close();
}catch (IOException e) {
e.printStackTrace();
}catch (TimeoutException e) {
e.printStackTrace();
}
}
}
B、MessageInfo.java
publicclassMessageInfoimplements Serializable {
privatestatic final long serialVersionUID = 1L;
privateString channel; //訊息渠道
privateString content; //訊息內容
privateint hashCode; //非同步執行緒標誌
publicString getChannel() {
returnchannel;
}
publicvoid setChannel(String channel) {
this.channel= channel;
}
publicString getContent() {
returncontent;
}
publicvoid setContent(String content) {
this.content= content;
}
publicint getHashCode() {
returnhashCode;
}
publicvoid setHashCode(int hashCode) {
this.hashCode= hashCode;
}
@Override
publicString toString() {
return"MessageInfo [channel=" + channel + ", content=" + content
+", hashCode=" + hashCode
+"]";
}
}
C、啟動RabbitMQ服務
具體如何啟動,請檢視文章第一部分介紹。
2、傳送端
PublisherHandler.java:
publicclassPublisherHandlerextends BaseConnector {
publicPublisherHandler(String queueName)throws IOException,TimeoutException {
super(queueName);
}
publicvoid sendMessage(MessageInfo messageInfo) {
try{
channel.basicPublish("",queueName,null, SerializationUtils.serialize(messageInfo));
}catch (Exception e) {
System.out.println("RabbitMQSend Message Error:"+e.getMessage());
}
}
/**
* 關閉頻道及連結
*/
publicvoid close() {
super.close();
}
}
3、接收端
接受端的實現,這裡考慮到多執行緒的情況,也是實際使用中常遇到的環境,所以它的實現,我們需要實現RabbitMQ介面Consumer,以及Runnable介面,具體如下:
publicclass ReceiverHandlerextendsBaseConnectorimplements Runnable,Consumer {
privateint hashCode = 0;
publicReceiverHandler(String queueName)throws IOException, TimeoutException {
super(queueName);
}
publicvoid receiveMessage() {
hashCode = Thread.currentThread().hashCode(); //區分不同工作程序的輸出
try{
System.out.println(hashCode+ " [*] Waiting for messages. To exit press CTRL+C");
Stringop_result = channel.basicConsume(queueName, true,this);
if("".equals(op_result)){
System.out.println("BasicConsumeConfig Consumer Queue Error!");
}
}catch (IOException e) {
System.out.println("Consumer Delivery Error,Msg info:" + e.getMessage());
}catch (Exception e) {
System.out.println("Error Is Opening,Msg info:" + e.getMessage());
}
}
@Override
publicvoid handleCancel(String arg0)throws IOException {
debug("===handleCancel==="+arg0);
}
@Override
publicvoid handleCancelOk(String arg0) {
debug("===handleCancelOk==="+arg0);
}
@Override
publicvoid handleConsumeOk(String arg0) {
debug("===handleConsumeOk==="+arg0);
}
@Override
publicvoid handleDelivery(String consumerTag, Envelope env,
BasicPropertiesprops, byte[] body) throws IOException {
MessageInfomessageInfo = (MessageInfo) SerializationUtils.deserialize(body);
messageInfo.setHashCode(hashCode);
System.out.println("message-info:"+msgInfo.toString());
}
@Override
publicvoid handleRecoverOk(String arg0) {
debug("===handleRecoverOk==="+arg0);
}
@Override
publicvoid handleShutdownSignal(String arg0, ShutdownSignalException arg1) {
debug("===handleShutdownSignal==="+arg0+"===ShutdownSignalException==="+arg1.getMessage());
}
@Override
publicvoid run() {
receiveMessage();
}
}
4、執行入口
publicstatic void main(String[] args){
PublisherHandlerpublisher = null;
ReceiverHandlerreceiver = null;
try{
receiver= new ReceiverHandler("mq_hello"); //接收者
ThreadreceiverThread = new Thread(receiver);
receiverThread.start();
publisher= new PublisherHandler("mq_hello"); //傳送者
MessageInfomsgInfo = new MessageInfo();
msgInfo.setChannel("hello");
msgInfo.setContent("HelloWorld!");
publisher.sendMessage(msgInfo);
}catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
publisher.close();
}
}
執行結果:
如上圖,我們可以清楚看到,HelloWorld!程式已經成功執行,得到了我們預期的結果顯示。好了,訊息佇列RabbitMQ入門篇就介紹到這裡,由於作者水平有限,如有問題請在評論發言或QQ群(245389109(新))討論,謝謝。