Spring Boot (25) RabbitMQ消息隊列
MQ全程(Message Queue)又名消息隊列,是一種異步通訊的中間件。可以理解為郵局,發送者將消息投遞到郵局,然後郵局幫我們發送給具體的接收者,具體發送過程和時間與我們無關,常見的MQ又kafka、activemq、zeromq、rabbitmq等等。
RabbitMQ
RabbitMQ是一個遵循AMQP協議,由面向高並發的erlang語言開發而成,用在實時的對可靠性要求比較高的消息傳遞上,支持多種語言客戶端,支持延遲隊列。
基礎概念
Broker:消息隊列的服務器實體
Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列
Binding:綁定,它主要是把exchange和queue按照路由規則綁定起來
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞
vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離
producer:消息生產者,投遞消息的程序
consumer:消息消費者,接收消息的程序
channel:消息通道,在客戶端的每個連接裏,可以建立多個channel,每個channel代表一個會話任務
常見應用場景
1.郵箱發送:用戶註冊後投遞消息到rabbitmq中,由消息的消費方異步的發送郵件,提升系統響應速度。
2.流量削鋒:一般在秒殺活動中應用廣泛,秒殺會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入消息隊列。用於控制活動人數,將超過此一定閥值的訂單直接丟棄。緩解端時間的高流量壓垮應用。
3.訂單超時:利用rabbitmq的延遲隊列,可以很簡單的實現訂單超功能,比如用戶在下單後30分鐘未支付取消訂單。
導入依賴
在pom.xml中添加spring-boot-starter-amqp的依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
屬性配置
在application.yml中配置rabbitmq的相關信息,這裏配置了手動的ACK開關
spring: rabbitmq: username: david password: 123456 host: localhost port: 5672 virtual-host: / listener: simple: acknowledge-mode: manual #手動ACK 不開啟自動ACK模式,目的是防止報錯後為正確處理消息丟失 默認為none
定義隊列
package com.spring.boot.utils; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String DEFAULT_BOOK_QUEUE = "dev.book.register.default.queue"; public static final String MANUAL_BOOK_QUEUE = "dev.book.register.manual.queue"; @Bean public Queue defaultBookQueue(){ //參數1 隊列名,參數2 是否持久化處理 return new Queue(DEFAULT_BOOK_QUEUE,true); } @Bean Queue manualBookQueue(){ return new Queue(MANUAL_BOOK_QUEUE,true); } }
實體類
package com.spring.boot.bean; import java.io.Serializable; public class Book implements Serializable{ private Integer id; private String name; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
控制器
新建一個bookController,用於消息發送
package com.spring.boot.controller; import com.spring.boot.bean.Book; import com.spring.boot.utils.RabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/books") public class BookController { //spring boot 2.x版本推薦構造器註入 而不是屬性註入 private final RabbitTemplate rabbitTemplate; @Autowired public BookController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @GetMapping("/defaultMessage") public void defaultMessage() { Book book = new Book(); book.setId(1); book.setName("hello RabbitMQ"); this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); } }
消息消費者
默認情況下 spring-boot-data-amqp是自動ACK機制,就意味著MQ會在消息消費完畢後自動幫我們去ACK,這樣依賴就存在這樣一個問題:如果報錯了,消息不會丟失,會無限循環消費,很容易把磁盤空間耗完,雖然可以配置消費的次數但這種做法也不太好。目前比較推薦的就是我們手動ACK然後將消費錯誤的消息轉移到其他的消息隊列中,做補償處理
package com.spring.boot.handler; import com.rabbitmq.client.Channel; import com.spring.boot.bean.Book; import com.spring.boot.utils.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class BookHandler { @RabbitListener(queues={RabbitConfig.DEFAULT_BOOK_QUEUE}) public void listenerAutoACK(Book book, Message message,Channel channel) throws IOException { final long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println("ListenerAutoAck 監聽到的消息:" + book.toString()); // TODO 通知MQ 已經被消費完成 可以ACK了 channel.basicAck(deliveryTag,false); } catch (IOException e) { //TODO 處理失敗,重新壓入MQ channel.basicRecover(); e.printStackTrace(); } } @RabbitListener(queues={RabbitConfig.MANUAL_BOOK_QUEUE}) public void listenerManualACK(Book book,Message message,Channel channel){ System.out.println("listenerManualACK監聽到的消息:"+book.toString()); try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { //如果報錯了 可以進行容錯處理,比如轉移當前消息進入其他隊列 e.printStackTrace(); } } }
測試:啟動項目 輸入路徑 http://localhost:8088/books/defaultMessage
Spring Boot (25) RabbitMQ消息隊列