SpringBoot與RabbitMQ的整合
1.RabbitMQ的介紹
介紹RabbitMQ之前先說一下AMQP協議:
AMQP,即Advanced Message Queuing Protocol(高階訊息佇列協議),一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制。Erlang中的實現有RabbitMQ等。
AMQP 其實和Http一樣 都是一種協議, 只不過 Http是針對網路傳輸的, 而AMQP是基於訊息佇列的。
AMQP 協議中的基本概念:
1)Broker
2)Virtual host: 出於多租戶和安全因素設計的,把AMQP的基本元件劃分到一個虛擬的分組中,類似於網路中的namespace概念。當多個不同的使用者使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個使用者在自己的vhost建立exchange/queue等。
3)Connection: publisher/consumer和broker之間的TCP連線。斷開連線的操作只會在client端進行,Broker不會斷開連線,除非出現網路故障或broker服務出現問題。
4)Channel:如果每一次訪問RabbitMQ都建立一個Connection,在訊息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連線,如果應用程式支援多執行緒,通常每個thread建立單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了作業系統建立TCP connection的開銷。
5)Exchange:message到達broker的第一站,根據分發規則
6)Queue: 訊息最終被送到這裡等待consumer取走。一個message可以被同時拷貝到多個queue中。
7)Binding: exchange和queue之間的虛擬連線,binding中可以包含routing key。Binding資訊被儲存到exchange中的查詢表中,用於message的分發依據。
Exchange的型別:
1)direct :這種型別的交換機的路由規則是根據一個routingKey的標識,交換機通過一個routingKey與佇列繫結 ,在生產者生產訊息的時候 指定一個routingKey 當繫結的佇列的routingKey 與生產者傳送的一樣 那麼交換機會吧這個訊息傳送給對應的佇列。
2)fanout:這種型別的交換機路由規則很簡單,只要與他綁定了的佇列, 他就會把訊息傳送給對應佇列(與routingKey沒關係)。
3)topic:這種型別的交換機路由規則也是和routingKey有關 只不過 topic他可以根據:*,#( *代表過濾一個單詞,#代表過濾後面所有單詞, 用.隔開)來識別routingKey 我打個比方 假設 我繫結的routingKey 有佇列A和B A的routingKey是:*.user B的routingKey是: #.user,那麼我生產一條訊息routingKey 為: error.user 那麼此時 2個佇列都能接受到, 如果改為 topic.error.user 那麼這時候 只有B能接受到了。
RabbitMQ:是一個開源的 基於AMQP協議實現的一個完整的企業級訊息中介軟體,服務端語言由Erlang(面向併發程式設計)語言編寫 對於高併發的處理有著天然的優勢,客戶端支援非常多的語言。
RabbitMQ如同redis一樣 他也是採用c/s架構 由服務端 與客戶端組成.
注意:在安裝RabbitMQ的時候,首先要下載Erlang安裝包,而且版本要對應。
2.RabbitMQ與SpringBoot的整合
1)首先引入SpringBoot和RabbitMQ的依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2)寫一個配置類,把我們的RabbitMQ的配置資訊配置好
public class RabbitmqConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost",5672); //我這裡直接在構造方法傳入了 // connectionFactory.setHost(); // connectionFactory.setPort(); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("testhost"); //是否開啟訊息確認機制 //connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { //注意 這個ConnectionFactory 是使用javaconfig方式配置連線的時候才需要傳入的 如果是yml配置的連線的話是不需要的 RabbitTemplate template = new RabbitTemplate(connectionFactory); return template; } }
配置完連線之後 我們就可以開始傳送訊息和接收訊息了,當然我們也可以在yml檔案中配置這些資訊:
3)編寫我們傳送訊息的工具類,並把它交給spring管理,在類裡面注入RabbitTemplate來進行訊息傳送。
@Component public class RabbitmqMessageSend { @Autowired RabbitTemplate rabbitTemplate; public void testSend() { //引數介紹: 交換機名字,路由建, 訊息內容 rabbitTemplate.convertAndSend("directExchange", "direct.key", "hello"); } }
4)然後我們模擬傳送一次訊息(通過瀏覽器呼叫一個controller,在裡面使用傳送訊息工具)。
@RestController public class OrderController { @Autowired RabbitmqMessageSend rabbitmqMessageSend; @RequestMapping("/order.do") public Object order(){ rabbitmqMessageSend.testSend(); return null; } }
啟動專案,通過瀏覽器訪問後臺呼叫傳送訊息的方法。
訪問後,我們去rabbitmq的控制檯檢視是否有訊息生成
到這裡,我們傳送訊息的功能就完成了。
5)然後我們在編寫一個消費訊息的功能:
引入依賴,建立RabbitMQ的配置類,這些都和傳送訊息功能一致
然後我們編寫消費訊息的工具類,主要使用@RabbitListener這個註解來進行訊息消費:
@Component public class ConsumerUtil { @RabbitListener(queues = "testQueue") public void get(String message) throws Exception{ System.out.println(message); } }
然後我們啟動專案,發現列印臺已經消費了訊息
這時候我們再去RabbitMQ的控制檯,發現訊息佇列已經為0了
到這裡,我們消費訊息的功能也完成了。
至此,最簡單的RabbitMQ和SpringBoot的整合就完成了。