1. 程式人生 > >rabbitmq 主題交換機java 實現

rabbitmq 主題交換機java 實現

主題交換機

扇形(fanout)exchange 會把所有的訊息傳送到繫結的queue,只是做一個廣播。
直連(direct)exchange 把訊息傳送到繫結的queue 的 routing key與訊息的routing key相同的queue。
儘管直連交換機能夠改善我們的系統,但是它也有它的限制 —— 沒辦法基於多個標準執行路由操作。

在日誌系統中我們不會單獨訂閱只基於級別(error,info,warning)的日誌,還會希望訂閱裝置或者不同服務(web ,service,email等服務)的日誌。這樣的話就要求有很大的靈活性,這樣就可以訂閱郵件服務的error 日誌,也可以訂閱所有的error 日誌。

傳送到主題交換機(topic exchange)的訊息不可以攜帶隨意什麼樣子的路由鍵(routing_key),它的路由鍵必須是一個由.分隔開的詞語列表。這些單詞隨便是什麼都可以,但是最好是跟攜帶它們的訊息有關係的詞彙。以下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數可以隨意,但是不要超過255位元組。
繫結鍵也必須擁有同樣的格式。主題交換機背後的邏輯跟直連交換機很相似 —— 一個攜帶著特定路由鍵的訊息會被主題交換機投遞給繫結鍵與之想匹配的佇列。但是它的繫結鍵和路由鍵有兩個特殊應用方式:

* (星號) 用來表示一個單詞.
# (井號) 用來表示任意數量(零個或多個)單詞。

主題交換機是很強大的,它可以表現出跟其他交換機類似的行為
當一個佇列的繫結鍵為 "#"(井號) 的時候,這個佇列將會無視訊息的路由鍵,接收所有的訊息。
當 * (星號) 和 # (井號) 這兩個特殊字元都未在繫結鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。

示例

將主題交換機應用到我們的日誌系統中。在開始工作前,我們假設日誌的路由鍵由兩個單片語成,路由鍵看起來是這樣的:service.severity程式碼跟上一篇教程差不多
假設有 order,user,email 服務,日誌級別有error,info,warning。
生產者:服務和日誌級別的排列組合作為routing key(如 order.email,order.info,order.warning,user.error,user.info .....),分別傳送3條日誌。
消費者:定義消費者,A、接收所有error 級別日誌( *.error);B、接收 order 服務的日誌(order.*),C、接收user服務和info級別的日誌(user.* *.info),D、接收所有的日誌(#)


程式碼
程式碼和路由(http://blog.csdn.net/convict_eva/article/details/52312435)的程式碼沒有多大區別,只是把exchange 的名稱和型別修改為logs_topic 和 topic,
傳送訊息的routingKey值為:order.error ,order.info,order.warning 等等
訊息接收繫結的路由時根據上面“消費者”定義的設定routing key 的值。
程式碼結構如下:

Send 生產者傳送訊息:

//級別
private static String[] LOG_LEVELS={"error","info","warning","other"
}; //服務 private static String[] SERVERS = {"order","user","email"};

傳送訊息

for(String server : SERVERS){
    for(String level:LOG_LEVELS){
        String routingKey = server+"."+level;//組合路由鍵,如 order.error
String message = "";
        for(int i=0;i<3;i++){//每一種路由鍵傳送3條訊息
message="hello.."+i+"___"+routingKey;
            //傳送訊息指定routing_key
channel.basicPublish(RabbitmqConfigure.EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

接收訊息,佇列和routing key的繫結修改:

        A:channel.queueBind(queueName, RabbitmqConfigure.EXCHANGE_NAME,"*.error");
        B:channel.queueBind(queueName, RabbitmqConfigure.EXCHANGE_NAME,"order.*");
        C:channel.queueBind(queueName, RabbitmqConfigure.EXCHANGE_NAME,"user.*");
               channel.queueBind(queueName, RabbitmqConfigure.EXCHANGE_NAME,"*.info");
        D:channel.queueBind(queueName, RabbitmqConfigure.EXCHANGE_NAME,"#");


測試
先執行RecvA,RecvB,RecvC,RecvD  再執行Send 傳送訊息。

傳送訊息:

    

        A、接收所有error 級別日誌( *.error)

            

        B、接收 order 服務的日誌(order.*)

            

        C、接收user服務和info級別的日誌(user.* *.info)

            

        D、接收所有的日誌(#)

            

不指定傳送訊息的routing key( routing key 設定為空字串,channel.basicPublish(RabbitmqConfigure.EXCHANGE_NAME, "", null, "路由鍵為空字串".getBytes()); ),繫結路由鍵為 “*” 的佇列到訊息,繫結為“#” 的佇列可以收到。

繫結鍵為 #.* 的佇列會獲取到一個名為..的路由鍵的訊息,它會取到一個路由鍵為單個單詞的訊息

rabbitmq官網:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

rabbitmq中文:http://rabbitmq.mr-ping.com/tutorials_with_python/[5]Topics.html

code: http://download.csdn.net/detail/convict_eva/9612865