1. 程式人生 > >新手入門教程-------Spring Boot中整合RabbitMQ

新手入門教程-------Spring Boot中整合RabbitMQ

AMQP:是Advanced Message Queuing Protocol的簡稱,高階訊息佇列協議,是一個面向訊息中介軟體的開放式標準應用層協議。

 

定義了以下特性:

  • 訊息方向
  • 訊息佇列
  • 訊息路由(包括:點到點和釋出-訂閱模式)
  • 可靠性
  • 安全性

RabitMQ:是以AMQP協議實現的一種中介軟體產品。RabbitMq是一個開源的AMQP實現,伺服器端用erlang語言編寫,支援多種客戶端。

常用概念:
通常由三個概念:發訊息者、佇列、收訊息者,RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在發訊息者和 佇列之間, 加入了交換器 (Exchange

). 這樣發訊息者和佇列就沒有直接聯絡, 轉而變成發訊息者把訊息給交換器, 交換器根據排程策略再把訊息再給佇列。

 

首先,先安裝Erland,通過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接開啟並完成安裝。

接著,

  1. 安裝RabbitMq,通過官方下載頁面https://www.rabbitmq.com/download.html獲取exe安裝包。
  2. 下載完成後,直接執行安裝程式。
  3. RabbitMQ Server安裝完成之後,會自動的註冊為服務,並以預設配置啟動起來。

環境搭建可以參考:http://blog.didispace.com/spring-boot-rabbitmq/

RabbitMQ管理:

我們可以直接通過配置檔案的訪問進行管理,也可以通過Web的訪問進行管理。下面我們將介紹如何通過Web進行管理。

在sbin資料夾開啟CMD,執行rabbitmq-plugins enable rabbitmq_management命令,開啟Web管理外掛,這樣我們就可以通過瀏覽器來進行管理了。

重啟mq的命令是:rabbitmq-server restart

 

 

  • 開啟瀏覽器並訪問:http://localhost:15672/,並使用預設使用者guest登入,密碼也為guest。我們可以看到如下圖的管理頁面:

 

  • 點選Admin標籤,在這裡可以進行使用者的管理。例如新增使用者,記得要給使用者設定許可權,不然可能會導致下面工程連線不上。

 

利用springboot來進行整合:

1. 編寫配置檔案類:

在com.example包中增加類,名稱為HelloRabbitConfig,並修改程式碼為

  1. package com.example;  
  2.   
  3. import org.springframework.amqp.core.Queue;  
  4. import org.springframework.context.annotation.Bean;  
  5. import org.springframework.context.annotation.Configuration;  
  6.   
  7. @Configuration  
  8. public class HelloRabbitConfig {  
  9.   
  10.     @Bean  
  11.     public Queue helloQueue() {  
  12.         return new Queue("hello");  
  13.     }  
  14. }  

2. 編寫傳送訊息類:

在com.example包中增加類,名稱為HelloSender,並修改程式碼為:

  1. package com.example;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import org.slf4j.Logger;  
  6. import org.slf4j.LoggerFactory;  
  7. import org.springframework.amqp.core.AmqpTemplate;  
  8. import org.springframework.beans.factory.annotation.Autowired;  
  9. import org.springframework.stereotype.Component;  
  10.   
  11. @Component  
  12. public class HelloSender {  
  13.   
  14.     protected static Logger logger=LoggerFactory.getLogger(HelloSender.class);   
  15.       
  16.     @Autowired  
  17.     private AmqpTemplate rabbitTemplate;  
  18.   
  19.     public String send(String name) {  
  20.         String context = "hello "+name+" --" + new Date();  
  21.         logger.debug("HelloSender: " + context);  
  22.         this.rabbitTemplate.convertAndSend("hello", context);  
  23.         return context;  
  24.     }  
  25. }  

3.編寫接收訊息類:

在com.example包中增加類,名稱為HelloReceiver,並修改程式碼為

  1. package com.example;  
  2.   
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. @RabbitListener(queues = "hello")  
  11. public class HelloReceiver {  
  12.     protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);  
  13.   
  14.     @RabbitHandler  
  15.     public void process(String hello) {  
  16.         logger.debug("HelloReceiver : " + hello);  
  17.     }  
  18. }  

4. 編寫RestController 類,呼叫傳送訊息:

在com.example包中增加類,名稱為HelloController,並修改程式碼為

  1. package com.example;  
  2.   
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5. import org.springframework.beans.factory.annotation.Autowired;  
  6. import org.springframework.web.bind.annotation.PathVariable;  
  7. import org.springframework.web.bind.annotation.RequestMapping;  
  8. import org.springframework.web.bind.annotation.RestController;  
  9.   
  10. @RestController   
  11. public class HelloController {  
  12.     protected static Logger logger=LoggerFactory.getLogger(HelloController.class);   
  13.       
  14.     @Autowired  
  15.         private HelloSender helloSender;  
  16.       
  17.     @RequestMapping("/send/{name}")  
  18.     public String helloworld(@PathVariable String name) {  
  19.         return helloSender.send(name);  
  20.     }  
  21. }  

5. 執行測試

在sbin資料夾CMD呼叫rabbitmq-server restart之後,在工程所在目錄開啟命令列,執行mvn spring-boot:run,最後在瀏覽器輸入http://localhost:8080/send/上帝

參考:https://blog.csdn.net/u012930316/article/details/76853778

 

 

工程解析:

1.配置檔案:

在配置檔案中,僅僅配置了一個名為hello的佇列,以後傳送,接收都與這個佇列有關

2. 傳送訊息:

2.1傳送訊息使用模板機制,用springboot自動注入rabbitmqTemplate,它封裝好了連結,管道,轉換等

2.2訊息轉換和傳送

void convertAndSend(String routingKey, Object message) throws AmqpException;

它的註釋是:Convert a Java object to an Amqp {@link Message} and send it to a default exchange with a specific routing key.

將一個Java物件轉換為Amqp訊息,然後用預設的減緩及指定路由鍵傳送資訊。

 

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
exchange:交換機名稱

routingKey:路由關鍵字

object:傳送的訊息內容

correlationData:訊息ID

 

3. 接收訊息

3.1監聽訊息

在類上宣告@RabbitListener(queues = "hello"),表示監聽hello佇列

3.2訊息處理控制代碼

在接收處理訊息的方法上宣告@RabbitHandler

Annotation that marks a method to be the target of a Rabbit message  listener within a class that is annotated with {@link RabbitListener}.

訊息消費者:

消費者負責宣告交換機(生產者也可以宣告),佇列,以及兩者的繫結操作。

交換機:

/**
     * 針對消費者配置
        FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
        HeadersExchange :通過新增屬性key-value匹配
        DirectExchange:按照routingkey分發到指定佇列
        TopicExchange:多關鍵字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE);
    }

 

佇列:


@Bean
    public Queue queue() {
        return new Queue("spring-boot-queue", true); //佇列持久
 
    }

 

繫結:binding:

 

  @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
    }

 


 

 

 

整體:整個工程比較簡單,沒有什麼特殊的配置,僅僅三個類檔案即可實現訊息的傳送和接受。