基於springboot整合的rabbitmq
概述
RabbitMQ是對高級消息隊列協議(Advanced Message Queueing Protocol, AMQP)的實現,RabbitMQ是消息傳輸的中間者,可以把它當做是一個消息代理,你把消息傳送給它,它再把消息發送給具體的接收人。 這就像是郵局一樣,你把郵件放入郵箱當中,郵件員會把郵件發送給你的收件人。不同的是RabbitMQ是接受,存儲和轉發二進制數據塊——消息。詳細
代碼下載:http://www.demodashi.com/demo/15078.html
RabbitMQ官方解釋:
消息系統允許軟件、應用相互連接和擴展。這些應用可以相互鏈接起來組成一個更大的應用,或者將用戶設備和數據
進行連接。消息系統通過將消息的發送和接收分離來實現應用程序的異步和解偶。
我們白話文的理解就是:是一個消息代理 - 一個消息系統的媒介。它可以為你的應用提供一個通用的消息發送和接收平臺,並且保證消息在傳輸過程中的安全。
一、RabbitMQ模型簡介
AMQP 的工作過程如下圖:消息(message)被發布者(publisher)發送給交換機(exchange),交換機常常被比喻成郵局或者郵箱。然後交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最後AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。
二、RabbitMQ 交換機:
Name | Default pre-declared names |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
1. 默認交換機:
default exchange實際上是一個由消息代理預先聲明好的沒有名字(名字為空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
2.Direct 直連交換機:
直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。直連交換機用來處理消息的單播路由(unicast routing)(盡管它也可以處理多播路由)。
3.fanout扇形交換機:
扇型交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這所有的N個隊列。
4.topic 主題交換機:
主題交換機(topic exchanges)通過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機經常用來實現各種分發/訂閱模式及其變種。主題交換機通常用來實現消息的多播路由(multicast routing)。
5.head交換機:
有時消息的路由操作會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是為此而生的。頭交換機使用多個消息屬性來代替路由鍵建立路由規則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則。
二、隊列
AMQP中的隊列(queue)跟其他消息隊列或任務隊列中的隊列是很相似的:它們存儲著即將被應用消費掉的消息。隊列跟交換機共享某些屬性,但是隊列也有一些另外的屬性。
-
Name
-
Durable(消息代理重啟後,隊列依舊存在)
-
Exclusive(只被一個連接(connection)使用,而且當連接關閉後隊列即被刪除)
-
Auto-delete(當最後一個消費者退訂後即被刪除)
-
Arguments(一些消息代理用他來完成類似與TTL的某些額外功能)
隊列在聲明(declare)後才能被使用。如果一個隊列尚不存在,聲明一個隊列會創建它。如果聲明的隊列已經存在,並且屬性完全相同,那麽此次聲明不會對原有隊列產生任何影響。如果聲明中的屬性與已存在隊列的屬性有差異,那麽一個錯誤代碼為406的通道級異常就會被拋出。
1.隊列名稱
隊列的名字可以由應用(application)來取,也可以讓消息代理(broker)直接生成一個。隊列的名字可以是最多255字節的一個utf-8字符串。若希望AMQP消息代理生成隊列名,需要給隊列的name參數賦值一個空字符串:在同一個通道(channel)的後續的方法(method)中,我們可以使用空字符串來表示之前生成的隊列名稱。之所以之後的方法可以獲取正確的隊列名是因為通道可以默默地記住消息代理最後一次生成的隊列名稱。
以"amq."開始的隊列名稱被預留做消息代理內部使用。如果試圖在隊列聲明時打破這一規則的話,一個通道級的403 (ACCESS_REFUSED)錯誤會被拋出。
2.隊列持久化
持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啟的時候,它依舊存在。沒有被持久化的隊列稱作暫存隊列(Transient queues)。並不是所有的場景和案例都需要將隊列持久化。
持久化的隊列並不會使得路由到它的消息也具有持久性。倘若消息代理掛掉了,重新啟動,那麽在重啟的過程中持久化隊列會被重新聲明,無論怎樣,只有經過持久化的消息才能被重新恢復。
3.綁定
綁定(Binding)是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則。如果要指示交換機“E”將消息路由給隊列“Q”,那麽“Q”就需要與“E”進行綁定。綁定操作需要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。路由鍵的意義在於從發送給交換機的眾多消息中選擇出某些消息,將其路由給綁定的隊列。
打個比方:
-
隊列(queue)是我們想要去的位於紐約的目的地
-
交換機(exchange)是JFK機場
-
綁定(binding)就是JFK機場到目的地的路線。能夠到達目的地的路線可以是一條或者多條
擁有了交換機這個中間層,很多由發布者直接到隊列難以實現的路由方案能夠得以實現,並且避免了應用開發者的許多重復勞動。
如果AMQP的消息無法路由到隊列(例如,發送到的交換機沒有綁定隊列),消息會被就地銷毀或者返還給發布者。如何處理取決於發布者設置的消息屬性。
4.消費者
消息如果只是存儲在隊列裏是沒有任何用處的。被應用消費掉,消息的價值才能夠體現。在AMQP 模型中,有兩種途徑可以達到此目的:
-
將消息投遞給應用 ("push API")
-
應用根據需要主動獲取消息 ("pull API")
使用push API,應用(application)需要明確表示出它在某個特定隊列裏所感興趣的,想要消費的消息。如是,我們可以說應用註冊了一個消費者,或者說訂閱了一個隊列。一個隊列可以註冊多個消費者,也可以註冊一個獨享的消費者(當獨享消費者存在時,其他消費者即被排除在外)。
每個消費者(訂閱者)都有一個叫做消費者標簽的標識符。它可以被用來退訂消息。消費者標簽實際上是一個字符串。
5.消息確認
消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。而且網絡原因也有可能引起各種問題。這就給我們出了個難題,AMQP代理在什麽時候刪除消息才是正確的?AMQP 0-9-1 規範給我們兩種建議:
-
當消息代理(broker)將消息發送給應用後立即刪除。(使用AMQP方法:basic.deliver或basic.get-ok)
-
待應用(application)發送一個確認回執(acknowledgement)後再刪除消息。(使用AMQP方法:basic.ack)
前者被稱作自動確認模式(automatic acknowledgement model),後者被稱作顯式確認模式(explicit acknowledgement model)。在顯式模式下,由消費者應用來選擇什麽時候發送確認回執(acknowledgement)。應用可以在收到消息後立即發送,或將未處理的消息存儲後發送,或等到消息被處理完畢後再發送確認回執(例如,成功獲取一個網頁內容並將其存儲之後)。
如果一個消費者在尚未發送確認回執的情況下掛掉了,那AMQP代理會將消息重新投遞給另一個消費者。如果當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,然後再次嘗試投遞。
6.拒絕消息
當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。應用可以向消息代理表明,本條消息由於“拒絕消息(Rejecting Messages)”的原因處理失敗了(或者未能在此時完成)。當拒絕某條消息時,應用可以告訴消息代理如何處理這條消息——銷毀它或者重新放入隊列。當此隊列只有一個消費者時,請確認不要由於拒絕消息並且選擇了重新放入隊列的行為而引起消息在同一個消費者身上無限循環的情況發生。
Negative Acknowledgements
在AMQP中,basic.reject方法用來執行拒絕消息的操作。但basic.reject有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。但是如果你使用的是RabbitMQ,那麽你可以使用被稱作negative acknowledgements(也叫nacks)的AMQP 0-9-1擴展來解決這個問題。更多的信息請參考幫助頁面
7.預取消息
在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每個消費者一次可以接受多少條消息是非常有用的。這可以在試圖批量發布消息的時候起到簡單的負載均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生產應用每分鐘才發送一條消息,這說明處理工作尚在運行。)
註意,RabbitMQ只支持通道級的預取計數,而不是連接級的或者基於大小的預取。
8.消息屬性和有效載荷(消息主體)
AMQP模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以至於AMQP明確的定義了它們,並且應用開發者們無需費心思思考這些屬性名字所代表的具體含義。例如:
-
Content type(內容類型)
-
Content encoding(內容編碼)
-
Routing key(路由鍵)
-
Delivery mode (persistent or not)
投遞模式(持久化 或 非持久化) -
Message priority(消息優先權)
-
Message publishing timestamp(消息發布的時間戳)
-
Expiration period(消息有效期)
-
Publisher application id(發布應用的ID)
有些屬性是被AMQP代理所使用的,但是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱作消息頭(headers)。他們跟HTTP協議的X-Headers很相似。消息屬性需要在消息被發布的時候定義。
AMQP的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被AMQP代理當作不透明的字節數組來對待。消息代理不會檢查或者修改有效載荷。消息可以只包含屬性而不攜帶有效載荷。它通常會使用類似JSON這種序列化的格式數據,為了節省,協議緩沖器和MessagePack將結構化數據序列化,以便以消息的有效載荷的形式發布。AMQP及其同行者們通常使用"content-type" 和 "content-encoding" 這兩個字段來與消息溝通進行有效載荷的辨識工作,但這僅僅是基於約定而已。
消息能夠以持久化的方式發布,AMQP代理會將此消息存儲在磁盤上。如果服務器重啟,系統會確認收到的持久化消息未丟失。簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,並不會使得此消息具有持久化性質:它完全取決與消息本身的持久模式(persistence mode)。將消息以持久化方式發布時,會對性能造成一定的影響(就像數據庫操作一樣,健壯性的存在必定造成一些性能犧牲)。
9.消息確認
由於網絡的不確定性和應用失敗的可能性,處理確認回執(acknowledgement)就變的十分重要。有時我們確認消費者收到消息就可以了,有時確認回執意味著消息已被驗證並且處理完畢,例如對某些數據已經驗證完畢並且進行了數據存儲或者索引操作。
這種情形很常見,所以 AMQP 內置了一個功能叫做 消息確認(message acknowledgements),消費者用它來確認消息已經被接收或者處理。如果一個應用崩潰掉(此時連接會斷掉,所以AMQP代理亦會得知),而且消息的確認回執功能已經被開啟,但是消息代理尚未獲得確認回執,那麽消息會被從新放入隊列(並且在還有還有其他消費者存在於此隊列的前提下,立即投遞給另外一個消費者)。
協議內置的消息確認功能將幫助開發者建立強大的軟件。
三、準備工作(windows10環境下的RabbitMQ安裝步驟)
第一步:下載並安裝erlang
-
原因:RabbitMQ服務端代碼是使用並發式語言Erlang編寫的,安裝Rabbit MQ的前提是安裝Erlang。
-
下載地址:http://www.erlang.org/downloads
-
安裝完事兒後要記得配置一下系統的環境變量。
此電腦-->鼠標右鍵“屬性”-->高級系統設置-->環境變量-->“新建”系統環境變量
變量名:ERLANG_HOME
變量值就是剛才erlang的安裝地址,點擊確定。
然後雙擊系統變量path
點擊“新建”,將%ERLANG_HOME%\bin加入到path中。
最後windows鍵+R鍵,輸入cmd,再輸入erl,看到版本號就說明erlang安裝成功了。
第二步:下載並安裝RabbitMQ
下載地址:http://www.rabbitmq.com/download.html
雙擊下載後的.exe文件,安裝過程與erlang的安裝過程相同。
RabbitMQ安裝好後接下來安裝RabbitMQ-Plugins。打開命令行cd,輸入RabbitMQ的sbin目錄。
我的目錄是:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin
然後在後面輸入rabbitmq-plugins enable rabbitmq_management命令進行安裝
打開sbin目錄,雙擊rabbitmq-server.bat
等幾秒鐘看到這個界面後,訪問http://localhost:15672
然後可以看到如下界面
默認用戶名和密碼都是guest,登陸即可。
四、程序實現
1.創建rabbitmqconfig配置文件類:
package com.zxh.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @EnableRabbit @Configuration public class RabbitMqConfig { public static final String EXCHANGE = "spring.boot.direct"; public static final String ROUTINGKEY_FAIL = "spring.boot.routingKey.failure"; public static final String ROUTINGKEY = "spring.boot.routingKey"; public static final String QUEUE_NAME = "spring.demo"; public static final String QUEUE_NAME_FAIL = "spring.demo.failure"; //RabbitMQ的配置信息 @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private Integer port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; //建立一個連接容器,類型數據庫的連接池 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true);// 確認機制 // connectionFactory.setPublisherReturns(true); //發布確認,template要求CachingConnectionFactory的publisherConfirms屬性設置為true return connectionFactory; } // RabbitMQ的使用入口 @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必須是prototype類型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(this.connectionFactory()); template.setMessageConverter(this.jsonMessageConverter()); template.setMandatory(true); return template; } /** * 交換機 * 針對消費者配置 * FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 * HeadersExchange :通過添加屬性key-value匹配 * DirectExchange:按照routingkey分發到指定隊列 * DirectExchange:多關鍵字匹配 */ @Bean public DirectExchange exchange() { return new DirectExchange(EXCHANGE); } /** * 隊列 * * @return */ @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); //隊列持久 } @Bean public Queue queueFail() { return new Queue(QUEUE_NAME_FAIL, true); //隊列持久 } /** * 綁定 * * @return */ @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue()).to(exchange()).with(RabbitMqConfig.ROUTINGKEY); } @Bean public Binding bindingFail(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queueFail()).to(exchange()).with(RabbitMqConfig.ROUTINGKEY_FAIL); } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // @Bean // public CharacterEncodingFilter characterEncodingFilter() { // CharacterEncodingFilter filter = new CharacterEncodingFilter(); // filter.setEncoding("UTF-8"); // filter.setForceEncoding(true); // return filter; // } }
2.生產者推送消息
package com.zxh.service; import com.zxh.config.RabbitMqConfig; import com.zxh.pojo.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import java.util.List; import java.util.UUID; @Service public class UserService { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RabbitTemplate template; /** * 增加用戶 * */ public boolean addPerson(User user) throws Exception { Assert.notNull(user, "添加對象信息不能為空"); Assert.hasText(user.getUserId(), "添加對象信息用戶編號不能為空"); Assert.notNull(user.getAge(), "添加對象信息年齡不能為空"); template.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, user.toString()); // template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { // @Override // public void confirm(CorrelationData correlationData, boolean ack, String cause) { // if (!ack) { // logger.info("send message failed: " + cause); //+ correlationData.toString()); // throw new RuntimeException("send error " + cause); // } else { // logger.info("send to broke ok" + correlationData.getId()); // } // } // }); return true; } private Message buildMessage(User user) throws Exception { Message message = MessageBuilder.withBody(user.toString().getBytes()) .setMessageId(UUID.randomUUID().toString()).setContentType("application/json").build(); return message; } }
3.消費者訂閱消息
package com.zxh.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class UserTopicRecive { @RabbitListener(queues="spring.demo") public void process(String user) throws InterruptedException { System.out.println("TopicRecive1接受的消息: "+user); } }
五、程序演示
六、項目結構圖
七、小結 - RabbitMQ的工作流程介紹
1、建立信息。Publisher定義需要發送消息的結構和內容。
2、建立Conection和Channel。由Publisher和Consumer創建連接,連接到Broker的物理節點上,同時建立Channel。Channel是建立在Connection之上的,一個Connection可以建立多個Channel。Publisher連接Virtual Host 建立Channel,Consumer連接到相應的Queue上建立Channel。
3、聲明交換機和隊列。聲明一個消息交換機(Exchange)和隊列(Queue),並設置相關屬性。
4、發送消息。由Publisher發送消息到Broker中的Exchange中
5、路由轉發。RabbitMQ收到消息後,根據消息指定的Exchange(交換機) 來查找Binding(綁定) 然後根據規則(Routing Key)分發到不同的Queue。這裏就是說使用Routing Key在消息交換機(Exchange)和消息隊列(Queue)中建立好綁定關系,然後將消息發送到綁定的隊列中去。
6、消息接收。Consumer監聽相應的Queue,一旦Queue中有可以消費的消息,Queue就將消息發送給Consumer端。
7、消息確認。當Consumer完成某一條消息的處理之後,需要發送一條ACK消息給對應的Queue。
Consumer收到消息時需要顯式的向RabbitMQ Broker發送basic.ack消息或者Consumer訂閱消息時設置auto_ack參數為true。
在通信過程中,隊列對ACK的處理有以下幾種情況:
如果Consumer接收了消息,發送ack,RabbitMQ會刪除隊列中這個消息,發送另一條消息給Consumer。
如果Consumer接收了消息, 但在發送ack之前斷開Channel,RabbitMQ會認為這條消息沒有被deliver(遞送),如果有其他的Channel,會該消息將被發送給另外的Channel。如果沒有,當在Consumer再次連接的時候,這條消息會被redeliver(重新遞送)。
如果consumer接收了消息,但是忘記了ack,RabbitMQ不會重復發送消息。
新版RabbitMQ還支持Consumer reject某條(類)消息,可以通過設置requeue參數中的reject為true達到目地,那麽Consumer將會把消息發送給下一個註冊的Consumer。
8、關閉消息通道(channel)以及和服務器的連接。
代碼下載:http://www.demodashi.com/demo/15078.html
註:本文著作權歸作者,由demo大師發表,拒絕轉載,轉載需要作者授權
基於springboot整合的rabbitmq