1. 程式人生 > >Spring消息之AMQP.

Spring消息之AMQP.

分享 ati 字符 void c99 lint AS arraylist 不同類

一、AMQP 概述

AMQP(Advanced Message Queuing Protocol),高級消息隊列協議。

簡單回憶一下JMS的消息模型,可能會有助於理解AMQP的消息模型。在JMS中,有三個主要的參與者:消息的生產者、消息的消費者以及在生產者和消費者之間傳遞消息的通道(隊列或主題)。在JMS中,通道有助於解耦消息的生產者和消費者,但是這兩者依然會與通道相耦合。與之不同的是,AMQP的生產者並不會直接將消息發布到隊列中。AMQP在消息的生產者以及傳遞信息的隊列之間引入了一種間接的機制:Exchange。如下圖:

技術分享圖片

哈哈,筆主從今天開始也要學著自己畫圖了。

來看看 AMQP 消息的通信過程。首先,生產者把消息發給 Exchange,並帶有一個 routing key。其次,Exchange 和 隊列 之間 通過 binging 通信,binging 上也有 一個 routing key,AMQP定義了四種不同類型的Exchange,每一種都有不同的路由算法,根據Exchange的算法不同,它可能會使用消息的routing key或參數,並與 binding 的routing key或參數進行對比,來決定是否要將信息放到隊列中。然後,消費者從每個隊列中取出消息。

Exchange 的路由算法:

  • Direct:如果 消息的routing key 與 binding的routing key 直接匹配的話,消息將會路由到該隊列上;
  • Topic:如果 消息的routing key 與 binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上;
  • Headers:如果 消息參數表中的頭信息和值 都與 bingding參數表中 相匹配,消息將會路由到該隊列上;
  • Fanout:不管消息的routing key和參數表的頭信息/值是什麽,消息將會路由到所有隊列上。

AMQP 與 JMS 的區別:

1、AMQP為消息定義了線路層(wire-level protocol)的協議,而JMS所定義的是API規範。JMS的API協議能夠確保所有的實現都能通過通用的API來使用,但是並不能保證某個JMS實現所發送的消息能夠被另外不同的JMS實現所使用。而AMQP的線路層協議規範了消息的格式,消息在生產者和消費者間傳送的時候會遵循這個格式。這樣AMQP在互相協作方面就要優於JMS——它不僅能跨不同的AMQP實現,還能跨語言和平臺。

2、JMS 支持TextMessage、MapMessage 等復雜的消息類型;而AMQP 僅支持 byte[] 消息類型(復雜的類型可序列化後發送),個人認為這也是它能夠跨平臺和跨語言使用的原因之一。

3、由於Exchange 提供的路由算法,AMQP可以提供多樣化的路由方式來傳遞消息到消息隊列,而 JMS 僅支持 隊列 和 主題/訂閱 方式兩種。

二、Spring 集成 RabbitMQ

RabbitMQ是一個流行的開源消息代理,它實現了AMQP。Spring AMQP為RabbitMQ提供了支持,包括RabbitMQ連接工廠、模板以及Spring配置命名空間。

首先,需要安裝 RabbitMQ,我們可以在 http://www.rabbitmq.com/download.html 上找到安裝指南,具體怎麽安裝,不是這篇博文的重點,請筆友們自行解決。

接下來,讓我們一起來看看,Spring 和 RabbitMQ 的集成:

1、pom 依賴

    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.0.3.RELEASE</version>
    </dependency>

2、連接工廠 和 admin

技術分享圖片
    <rabbit:connection-factory id="connectionFactory"
                               host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.userName}"
                               password="${rabbitmq.password}"/>
    
    <rabbit:admin connection-factory="connectionFactory"/>
View Code

admin 元素會自動創建一個RabbitMQ管理組件,它會自動創建隊列、Exchange以及binding

3、聲明隊列、Exchange以及binding

聲明隊列:

技術分享圖片
    <rabbit:queue name="queue1"/>
    <rabbit:queue name="queue2"/>
    <rabbit:queue name="queue3"/>
    <rabbit:queue name="queue4"/>
    <rabbit:queue name="queue5"/>
    <rabbit:queue name="queue6"/>
View Code

聲明 Exchange 以及 binding:

direct-exchange:

技術分享圖片
    <rabbit:direct-exchange name="directExchange">
        <rabbit:bindings>
            <rabbit:binding key="queue1" queue="queue1"/>
            <rabbit:binding key="queue2" queue="queue2"/>
            <rabbit:binding key="queue3" queue="queue3"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
View Code

如果消息的routing key 與 routing key 直接匹配的話,消息將會路由到該隊列上。

topic-exchange

技術分享圖片
    <rabbit:topic-exchange name="topicExchange">
        <rabbit:bindings>
            <rabbit:binding pattern="routing.*" queue="queue2"/>
            <rabbit:binding pattern="routing.*" queue="queue3"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
View Code

消息的 routing key 與 binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上。

這個通配符匹配特別坑,賊坑!我本來寫了個 "routing*" ,自以為能匹配 "routingrrr" 這樣的字符,不行!然後我又寫了個"routing?"、"rounting.",預想著能不能匹配單個任意字符,不行!

終於我得出了一個結論,只能使用 "*"(匹配 0 個或任意多個)通配符,並且,並且!"*" 前面一定要有 個 "." ! 太可怕了,不知道我總結的對不對哈!

headers-exchange

技術分享圖片
    <rabbit:headers-exchange name="headersExchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue4" key="betty" value="rubble"   />
            <rabbit:binding queue="queue5" key="barney" value="rubble"   />
        </rabbit:bindings>
    </rabbit:headers-exchange>
View Code

消息參數表中的頭信息和值都與bingding參數表中相匹配,消息將會路由到該隊列上。

這個用法比較少用,也比較難用,原因是因為它僅支持 發送 byte[] 的消息類型。

fanout-exchange

技術分享圖片
    <rabbit:fanout-exchange name="fanoutExchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue5"/>
            <rabbit:binding queue="queue6"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
View Code

這個是最簡單粗暴的匹配規則,不管消息的routing key和參數表的頭信息/值是什麽,消息將會路由到所有隊列上。

4、發送和接收消息

還是Spring的那一套,Spring 為我們提供了一個模板 bean(rabbitTemplate) 來發送和接收消息。其中,像前文提到的 jmsTemplate 那樣,rabbitTemplate 也為我們 提供了 convertAndSend() 方法來自動轉換和發送消息,提供了receiveAndConvret() 方法來接收和自動轉換成對象(消息和對象之間默認的消息轉換器是SimpleMessageConverter,它適用於String、Serializable實例以及字節數組)。另外,rabbitTemplate 也照常提供了 send() 和 receive() 方法來發送和接收消息,不過貌似僅支持發送字節數組...

配置 rabbitTemplate:

技術分享圖片
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"
                     exchange="directExchange"
                     routing-key="queue1"/>
View Code

下面僅演示 通配符路由方式 和 header 路由方式 發送和接收消息。其他具體詳細的內容可參考我下面附上的源碼:

通配符路由方式:

技術分享圖片
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class TopicExchange {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public void convertAndSend(){
        List<String> list = new ArrayList<>();
        list.add("java");
        list.add("python");
        list.add("c++");
        rabbitTemplate.convertAndSend("topicExchange","routing.123", list);
    }


    @Test
    public void receiveAndConvert(){
        List<String> queue2List =(List) rabbitTemplate.receiveAndConvert("queue2");
        printList(queue2List);

        System.out.println("----------------華麗的分隔符-----------------");

        List<String> queue3List =(List) rabbitTemplate.receiveAndConvert("queue3");
        printList(queue3List);

    }


    private <E> void printList(List<E> list){
        if (list != null && list.size() > 0){
            for (Object o : list){
                System.out.println("-----------------"+ o +"---------------");
            }
        }
    }
}
View Code

header 路由方式:

技術分享圖片
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class HeadersExchangeTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void convertAndSend(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("betty", "rubble");
        messageProperties.setHeader("fred", "flintstone");
        messageProperties.setHeader("barney", "rubble");

        String str = new String("Hello RabbitMQ");
        Message message = new Message(str.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("headersExchange","",message);
    }

    @Test
    public void receiveAndConvert(){

        Message queue4 = rabbitTemplate.receive("queue4");
        System.out.println("第一個輸出:" + new String(queue4.getBody()));
        Message queue5 = rabbitTemplate.receive("queue5");
        System.out.println("第三個輸出:" + new String(queue5.getBody()));

    }

}
View Code

5、定義消息驅動的AMQP POJO

用 receive()和 receiveAndConvert()方法都會立即返回,如果隊列中沒有等待的消息時,將會得到 null。Spring AMQP提供了消息驅動POJO的支持,也就是相當於一個監聽器,監聽某些隊列,當消息到達指定隊列的時候,可以立即調用方法處理該消息。

listener-container 配置:

    <rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3" type="direct">
        <rabbit:listener ref="handlerListener" method="handler" queue-names="queue5,queue6"/>
    </rabbit:listener-container>

其中,ref 指定Spring bean 的 id,method 指定 該bean中處理隊列中消息的方法,queue-names 指定要監聽哪些隊列,隊列之間用 "," 分隔。

三、結語

祝大家五一節快樂!

演示源碼下載鏈接:https://github.com/JMCuixy/SpringMessageRabbitMQ

參考資料:《Spring 實戰第四版》

Spring-amqp文檔

Spring消息之AMQP.