1. 程式人生 > >RabbitMQ服務之入門篇

RabbitMQ服務之入門篇

RabbitMQ是一種訊息中介軟體,能夠很好的處理來自客戶端的非同步訊息傳送及請求,將訊息傳送放入到服務端的佇列池中,而接收端可以根據RabbitMQ配置的轉發機制接收和過濾服務端轉發來的訊息。RabbitMQ可以根據指定的訊息轉發規則進行訊息的轉發、緩衝和持久化操作,這也是其根身立命的地方,但是其誕生的主要目的是為了均衡執行緒耗時操作的壓力,前提是這些操作要滿足沒有要求即時反應,因為其不適合用在要求即時反應的需求,此時可以考慮使用快取中介軟體Redis、Memcache等,另外,RabbitMQ主要用在多伺服器間或單伺服器的子系統間進行通訊,是分散式系統標準的配置。

l   如何安裝

l   基本原理

l   入門例子

一、如何安裝

在Mac中,安裝RabbitMQ服務環境,需要到官網下載最新的.tar.xz檔案並解壓,若要啟動RabbitMQ服務或是使用相關的便捷命令,我們需要進入sbin目錄下,那麼啟動服務如下:

$sudo ./rabbitmq-server start

啟動服務之後,等待便是RabbitMQ佇列接收和轉發訊息了。如果要停止RabbitMQ服務,可以使用如下命令操作:

$sudo ./rabbitmqctl stop_app  ---停止服務

$sudo ./rabbitmqctl start_app  ---開始服務

二、基本原理

RabbitMQ的原理是根據其轉發器或交換機規則的特性來說的,目前RabbitMQ有四種轉發器型別:fanout、direct、topic及headers,前三種類型轉發器比較常用,而headers用的比較少,原因有兩個,一個是它的用法比較麻煩,另外就是前三者基本可以滿足所有實際需求,可以取代它的存在,不管怎樣,在這裡都會介紹下這四種轉發器的原理,具體如下:

1、fanout  exchange

說明:

fanout轉發器,是幾種轉發器中轉發訊息最快的一種,其路由規則會將訊息轉發給與轉發器繫結的每一個佇列中,也就是輪循轉發相同訊息給佇列。

2、direct  exchange

說明:

direct轉發器,會根據當前傳送和接受端協商的統一的routing key來完全匹配轉發訊息,也就是轉發器傳送標有routing key標誌的路由資訊,只有接收端的binding key與routing key與之相同,才會接收到資訊。

3、topic  exchange


說明:

topic轉發器,相對於direct轉發器,topic可以轉發符合多個條件的訊息,也就是傳送端傳送訊息,而接受端可以靈活配置接收訊息的路由規則,例如:msg.#和msg.*,前者能夠接收msg.log.info和msg.log型別訊息,而後者則能接收到msg.*型別訊息,所以#號代表一個或多個單詞匹配,而*則代表一個單詞匹配了,實際上就是正常的規則過濾機制。

4、headers  exchange


說明:

headers轉發器,也是用的比較少的轉發器,原因請檢視第一部分介紹。此種轉發器,忽略了路由routing key規則,使用了健-值對形式匹配規則,此種轉發器規定,在接受端必須使用x-match,它目前有兩種型別:all和any,前者代表所有的鍵-值都滿足後,才能收到資訊,而後者則滿足任意個就可以收到消,這個會在後續文章介紹,這裡只需瞭解即可。

三、入門例子

這裡我們以HelloWorld!程式為例子,來介紹RabbitMQ的使用。專案型別為maven管理的java專案,在介紹RabbitMQ使用時,會封裝一個通用的功能,隨著陸續文章的介紹,會完善該封裝工具的功能,後續不再說明。

1、準備

因為接下來介紹的傳送端和接收訊息端的服務連結是相同的,所以我們建議封裝為BaseConnector.java,供接收端和傳送端繼承使用;傳送和接收訊息的載體MessageInfo.java也是需要的,並且是需要被序列化,另外,在啟動程式前,需要先啟動機器的RabbitMQ服務,否則拒絕訪問連結,具體如下:

A、BaseConnector.java

publicclass BaseConnector {

    protectedChannel channel;

protected Connection connection;

protected   String queueName;

public BaseConnector(String queueName)throwsIOException, TimeoutException {

this. queueName = queueName;

       ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("127.0.0.1 ");

       connection = factory.newConnection();  //建立連線

       channel = connection.createChannel();  //建立頻道

       channel.queueDeclare(queueName, false,false,false,null);//宣告佇列

    }

    protectedvoid close() {  //關閉頻道及連結

        try {

           channel.close();

           connection.close();

       }catch (IOException e) {

           e.printStackTrace();

       }catch (TimeoutException e) {

           e.printStackTrace();

       }

    }

}

B、MessageInfo.java

publicclassMessageInfoimplements Serializable {

    privatestatic final long serialVersionUID = 1L;

    privateString channel; //訊息渠道

    privateString content; //訊息內容

    privateint hashCode;  //非同步執行緒標誌

    publicString getChannel() {

       returnchannel;

    }

    publicvoid setChannel(String channel) {

       this.channel= channel;

    }

    publicString getContent() {

       returncontent;

    }

    publicvoid setContent(String content) {

       this.content= content;

    }

    publicint getHashCode() {

       returnhashCode;

    }

    publicvoid setHashCode(int hashCode) {

       this.hashCode= hashCode;

    }

    @Override

    publicString toString() {

       return"MessageInfo [channel=" + channel + ", content=" + content

              +", hashCode=" + hashCode

              +"]";

    }

}

C、啟動RabbitMQ服務

具體如何啟動,請檢視文章第一部分介紹。

2、傳送端

PublisherHandler.java:

publicclassPublisherHandlerextends BaseConnector {

    publicPublisherHandler(String queueName)throws IOException,TimeoutException {

       super(queueName);

    }

    publicvoid sendMessage(MessageInfo messageInfo) {

       try{

              channel.basicPublish("",queueName,null, SerializationUtils.serialize(messageInfo));

       }catch (Exception e) {

           System.out.println("RabbitMQSend Message Error:"+e.getMessage());

       }

    }

    /**

     * 關閉頻道及連結

     */

    publicvoid close() {

       super.close();

    }

}

3、接收端

接受端的實現,這裡考慮到多執行緒的情況,也是實際使用中常遇到的環境,所以它的實現,我們需要實現RabbitMQ介面Consumer,以及Runnable介面,具體如下:

publicclass ReceiverHandlerextendsBaseConnectorimplements Runnable,Consumer {

    privateint hashCode = 0;

    publicReceiverHandler(String queueName)throws IOException, TimeoutException {

       super(queueName);

    }

    publicvoid receiveMessage() {

       hashCode = Thread.currentThread().hashCode(); //區分不同工作程序的輸出

       try{

           System.out.println(hashCode+ " [*] Waiting for messages. To exit press CTRL+C");

           Stringop_result = channel.basicConsume(queueName, true,this);  

           if("".equals(op_result)){

              System.out.println("BasicConsumeConfig Consumer Queue Error!");

           }

       }catch (IOException e) {

           System.out.println("Consumer Delivery Error,Msg info:" + e.getMessage());

       }catch (Exception e) {

           System.out.println("Error Is Opening,Msg info:" + e.getMessage());

       }

    }

    @Override

    publicvoid handleCancel(String arg0)throws IOException {

       debug("===handleCancel==="+arg0);

    }

    @Override

    publicvoid handleCancelOk(String arg0) {

       debug("===handleCancelOk==="+arg0);

    }

    @Override

    publicvoid handleConsumeOk(String arg0) {

       debug("===handleConsumeOk==="+arg0);

    }

    @Override

    publicvoid handleDelivery(String consumerTag, Envelope env,

           BasicPropertiesprops, byte[] body) throws IOException {

       MessageInfomessageInfo = (MessageInfo) SerializationUtils.deserialize(body);

       messageInfo.setHashCode(hashCode);

System.out.println("message-info:"+msgInfo.toString());

    }

    @Override

    publicvoid handleRecoverOk(String arg0) {

       debug("===handleRecoverOk==="+arg0);

    }

    @Override

    publicvoid handleShutdownSignal(String arg0, ShutdownSignalException arg1) {

       debug("===handleShutdownSignal==="+arg0+"===ShutdownSignalException==="+arg1.getMessage());

    }

    @Override

    publicvoid run() {

       receiveMessage();

    }

}

4、執行入口

publicstatic void main(String[] args){  

       PublisherHandlerpublisher = null;

       ReceiverHandlerreceiver = null;

       try{

           receiver= new ReceiverHandler("mq_hello");  //接收者

           ThreadreceiverThread = new Thread(receiver);

           receiverThread.start();

           publisher= new PublisherHandler("mq_hello");    //傳送者

           MessageInfomsgInfo = new MessageInfo();

           msgInfo.setChannel("hello");

           msgInfo.setContent("HelloWorld!");

           publisher.sendMessage(msgInfo);

       }catch (IOException | TimeoutException e) {

           e.printStackTrace();

       }finally {

           publisher.close();

       }

}

執行結果:

如上圖,我們可以清楚看到,HelloWorld!程式已經成功執行,得到了我們預期的結果顯示。好了,訊息佇列RabbitMQ入門篇就介紹到這裡,由於作者水平有限,如有問題請在評論發言或QQ群(245389109(新)討論,謝謝。