rabbitMQ五種模式使用方式以及springboot整合rabbitMQ的使用
MQ全稱為Message Queue,訊息佇列是應用程式和應用程式之間的通訊方法。
為什麼使用MQ
在專案中,可將一些無需即時返回且耗時的操作提取出來,進行非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。
開發中訊息佇列通常有如下應用場景:
1、任務非同步處理
將不需要同步處理的並且耗時長的操作由訊息佇列通知訊息接收方進行非同步處理。提高了應用程式的響應時間。
2、應用程式解耦合
MQ相當於一箇中介,生產方通過MQ與消費方互動,它將應用程式進行解耦合。
MQ是訊息通訊的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。
AMQP 與 JMS 區別
-
JMS是定義了統一的介面,來對訊息操作進行統一;AMQP是通過規定協議來統一資料互動的格式
-
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
-
JMS規定了兩種訊息模式;而AMQP的訊息模式更加豐富
市場上常見的訊息佇列有如下:
目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ
RabbitMQ:
使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將訊息直接傳送給佇列,訊息在傳送給客戶端時先在中心佇列排隊。對路由(Routing),負載均衡(Load balance)、資料持久化都有很好的支援。多用於進行企業級的ESB整合。
RabbitMQ提供了6種模式:簡單模式,work模式,Publish/Subscribe釋出與訂閱模式,Routing路由模式,Topics主題模式,RPC遠端呼叫模式(遠端呼叫,不太算MQ)
角色說明:
1、 超級管理員(administrator)
可登陸管理控制檯,可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。
2、 監控者(monitoring)
可登陸管理控制檯,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁碟使用情況等)
3、 策略制定者(policymaker)
可登陸管理控制檯, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框標識的部分)。
4、 普通管理者(management)
僅可登陸管理控制檯,無法看到節點資訊,也無法對策略進行管理。
5、 其他
無法登陸管理控制檯,通常就是普通的生產者和消費者。
設定Virtual Hosts許可權:
user:使用者名稱
configure :一個正則表示式,使用者對符合該正則表示式的所有資源擁有 configure 操作的許可權
write:一個正則表示式,使用者對符合該正則表示式的所有資源擁有 write 操作的許可權
read:一個正則表示式,使用者對符合該正則表示式的所有資源擁有 read 操作的許可權
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
//建立連結工廠物件 //設定RabbitMQ服務主機地址,預設localhost //設定RabbitMQ服務埠,預設5672 //設定虛擬主機名字,預設/ //設定使用者連線名,預設guest //設定連結密碼,預設guest //建立連結 //建立頻道 //宣告佇列 //建立訊息 //訊息傳送 //關閉資源
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立連結工廠物件 ConnectionFactory connectionFactory = new ConnectionFactory(); //設定RabbitMQ服務主機地址,預設localhost connectionFactory.setHost("192.168.211.132"); //設定RabbitMQ服務埠,預設5672 connectionFactory.setPort(5672); //設定虛擬主機名字(在rabbitmq伺服器中,訊息佇列是放在虛擬主機中的,這是為了更好分類管理各種訊息佇列,一般會加/) //虛擬主機名字得先在伺服器中手動新增一個,否則會找不到虛擬主機 connectionFactory.setVirtualHost("/qianyi"); //設定使用者連線名,預設guest(你想用哪個rabbitmq伺服器中的賬戶就填哪個的賬號密碼) connectionFactory.setUsername("guest"); //設定連結密碼,預設guest connectionFactory.setPassword("guest"); //建立連結(在本身和rabbitmq伺服器之間建立連線,類似跟redis、mysql之間建立連線) Connection connection = connectionFactory.newConnection(); //建立頻道(在本身和指定的rabbitmq伺服器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); //宣告佇列(說明要在rabbitmq伺服器中指定的虛擬主機中的哪條訊息佇列) /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi1",true,false,false,null); //建立訊息 String message = "hello!qianyi!"; //訊息傳送 /** * 訊息傳送 * 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage(不寫就填空串) * 引數2:路由key,簡單模式可以傳遞佇列名稱 * 引數3:訊息其它屬性(沒有填null) * 引數4:訊息內容(訊息內容是字串,需要轉換成位元組陣列才能傳輸) */ channel.basicPublish("", "qianyi1", null, message.getBytes()); //關閉資源(連線和頻道的) channel.close(); connection.close(); }
}
//建立連結工廠物件 //設定RabbitMQ服務主機地址,預設localhost //設定RabbitMQ服務埠,預設5672 //設定虛擬主機名字,預設/ //設定使用者連線名,預設guest //設定連結密碼,預設guest //建立連結 //建立頻道 //建立佇列 //建立消費者,並設定訊息處理 //訊息監聽 //關閉資源(不建議關閉,建議一直監聽訊息)
public static void main(String[] args) throws IOException, TimeoutException { //建立連結工廠物件 ConnectionFactory connectionFactory = new ConnectionFactory(); //設定RabbitMQ服務主機地址,預設localhost connectionFactory.setHost("192.168.211.132"); //設定RabbitMQ服務埠,預設5672 connectionFactory.setPort(5672); //設定虛擬主機名字(rabbitmq伺服器上建立的虛擬主機) connectionFactory.setVirtualHost("qianyi"); //設定使用者連線名,預設guest(指定用哪個使用者登入,guest是預設超級管理員) connectionFactory.setUsername("guest"); //設定連結密碼,預設guest connectionFactory.setPassword("guest"); //建立連結(連線到rabbitmq伺服器) Connection connection = connectionFactory.newConnection(); //建立頻道(建立連線rabbitmq伺服器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明佇列,指定到哪個佇列獲取訊息 /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi1",true,false,false,null); //建立消費者,並設定訊息處理(DefaultConsumer:訊息消費者,引數傳入建立的頻道)然後再重寫handleDelivery方法,可以用lambab表示式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 訊息者標籤,在channel.basicConsume時候可以指定 * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送) * @param properties 屬性資訊 * @param body 訊息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機資訊 String exchange = envelope.getExchange(); //獲取訊息ID long deliveryTag = envelope.getDeliveryTag(); //獲取訊息資訊 String message = new String(body, "UTF-8"); //輸出獲得的訊息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //訊息監聽 /** * 訊息監聽 * 要監聽哪個佇列?當消費者收到訊息之後是否自動告訴rebbitmq伺服器已經收到?收到訊息之後,如何處理呢? * 引數1:佇列名稱 * 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認 * 引數3:訊息接收到後回撥(傳入上面建立的消費者物件,這個消費者物件中對做了對收到的訊息處理) */ channel.basicConsume("qianyi1",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽訊息) //channel.close(); //connection.close(); }
}
P:生產者,也就是要傳送訊息的程式
C:消費者:訊息的接受者,會一直等待訊息到來。
queue:訊息佇列,圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。
在rabbitMQ中消費者是一定要到某個訊息佇列中去獲取訊息的
模式說明
Work Queues
與入門程式的簡單模式
相比,多了一個或一些消費端,多個消費端共同消費同一個佇列中的訊息。
應用場景
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重複程式碼擷取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //建立頻道(在本身和指定的rabbitmq伺服器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); //宣告佇列(說明要在rabbitmq伺服器中指定的虛擬主機中的哪條訊息佇列) /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi2",true,false,false,null); //建立訊息(因為是發給多個消費者,所以進行for迴圈) for (int i = 0; i <= 3; i++) { String message = "hello!qianyi!"+i; //訊息傳送 /** * 訊息傳送 * 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage(不寫就填空串) * 引數2:路由key,簡單模式可以傳遞佇列名稱 * 引數3:訊息其它屬性(沒有填null) * 引數4:訊息內容(訊息內容是字串,需要轉換成位元組陣列才能傳輸) */ channel.basicPublish("", "qianyi2", null, message.getBytes()); } //關閉資源(連線和頻道的) channel.close(); connection.close(); } }
com.xxx.work.ConsumeOne
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //建立頻道(建立連線rabbitmq伺服器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明佇列,指定到哪個佇列獲取訊息 /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi2",true,false,false,null); //建立消費者,並設定訊息處理(DefaultConsumer:訊息消費者,引數傳入建立的頻道)然後再重寫handleDelivery方法,可以用lambab表示式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 訊息者標籤,在channel.basicConsume時候可以指定 * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送) * @param properties 屬性資訊 * @param body 訊息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機資訊 String exchange = envelope.getExchange(); //獲取訊息ID long deliveryTag = envelope.getDeliveryTag(); //獲取訊息資訊 String message = new String(body, "UTF-8"); //輸出獲得的訊息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //訊息監聽 /** * 訊息監聽 * 要監聽哪個佇列?當消費者收到訊息之後是否自動告訴rebbitmq伺服器已經收到?收到訊息之後,如何處理呢? * 引數1:佇列名稱 * 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認 * 引數3:訊息接收到後回撥(傳入上面建立的消費者物件,這個消費者物件中對做了對收到的訊息處理) */ channel.basicConsume("qianyi2",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽訊息) //channel.close(); //connection.close(); } }
P:生產者,也就是要傳送訊息的程式
C:消費者:訊息的接受者,會一直等待訊息到來。
Queue:訊息佇列,圖中紅色部分
而在訂閱模型中,多了一個exchange(交換機)角色,而且過程略有變化:
P:生產者,也就是要傳送訊息的程式,但是不再發送到佇列中,而是發給X(交換機)
C:消費者,訊息的接受者,會一直等待訊息到來。
Queue:訊息佇列,接收訊息、快取訊息。
Exchange:交換機,圖中的X。一方面,接收生產者傳送的訊息。另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。Exchange有常見以下3種類型:
Fanout:廣播,將訊息交給所有繫結到交換機的佇列
Direct:定向,把訊息交給符合指定routing key 的佇列
Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列
1.每個消費者監聽自己的佇列。
2.生產者將訊息發給broker(代理人),由交換機將訊息轉發到繫結此交換機的每個佇列,每個繫結交換機的佇列都將接收到訊息
(1)生產者
生產者需要注意如下3點:
1.宣告交換機
2.宣告佇列
3.佇列需要繫結指定的交換機
生產者:申明一個交換機,然後繫結這個交換機所有的(根據需求)佇列,傳送訊息即可
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重複程式碼擷取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //建立頻道(在本身和指定的rabbitmq伺服器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); /** * 宣告交換機 * 引數1:交換機名稱 * 引數2:交換機型別,fanout、topic、direct、headers(以下用fanout型別,廣播模式,每個與交換機繫結的佇列都會接收到資訊) */ channel.exchangeDeclare("QY", BuiltinExchangeType.FANOUT); //宣告佇列(說明要在rabbitmq伺服器中指定的虛擬主機中的哪條訊息佇列) /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi3",true,false,false,null); channel.queueDeclare("qianyi4",true,false,false,null); //佇列繫結交換機 //引數1:需要繫結的佇列 //引數2:需要繫結的交換機 channel.queueBind("qianyi3","QY",""); channel.queueBind("qianyi4","QY",""); //建立訊息(中文會亂碼) String message = "釋出訂閱模式:歡迎光臨紅浪漫!"; //訊息傳送 /** * 訊息傳送 * 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage(不寫就填空串) * 引數2:路由key,簡單模式可以傳遞佇列名稱,釋出訂閱模式不傳遞佇列名稱 * 引數3:訊息其它屬性(沒有填null) * 引數4:訊息內容(訊息內容是字串,需要轉換成位元組陣列才能傳輸) */ channel.basicPublish("QY", "", null, message.getBytes()); //關閉資源(連線和頻道的) channel.close(); connection.close(); } }
消費者:消費者可以是多個,只要監聽的佇列跟交換機綁定了,那麼生產者傳送的內容這個消費者都能收到
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //建立頻道(建立連線rabbitmq伺服器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明佇列,指定到哪個佇列獲取訊息 /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi3",true,false,false,null); //建立消費者,並設定訊息處理(DefaultConsumer:訊息消費者,引數傳入建立的頻道)然後再重寫handleDelivery方法,可以用lambab表示式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 訊息者標籤,在channel.basicConsume時候可以指定 * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送) * @param properties 屬性資訊 * @param body 訊息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機資訊 String exchange = envelope.getExchange(); //獲取訊息ID long deliveryTag = envelope.getDeliveryTag(); //獲取訊息資訊 String message = new String(body, "UTF-8"); //輸出獲得的訊息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //訊息監聽 /** * 訊息監聽 * 要監聽哪個佇列?當消費者收到訊息之後是否自動告訴rebbitmq伺服器已經收到?收到訊息之後,如何處理呢? * 引數1:佇列名稱 * 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認 * 引數3:訊息接收到後回撥(傳入上面建立的消費者物件,這個消費者物件中對做了對收到的訊息處理) */ channel.basicConsume("qianyi3",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽訊息) //channel.close(); //connection.close(); } }
總結:生產者傳送的訊息先發給申明的交換機,交換機又綁定了一個或者多個佇列,那麼在這個模式下,消費者只需要監視跟交換機繫結的佇列,就可以獲取到生產者傳送的訊息。
如果兩個或者多個消費者監視同一個佇列,那麼又會出現這種情況:即生產者傳送一條訊息,能同時接收到資訊的只有一個消費者,無法做到上面的效果,多個消費者同時收到訊息
所以如果需要同時接收訊息的話,必須一個消費者監聽一條佇列,而該佇列必須跟交換機有繫結的關係
釋出訂閱模式與work佇列模式的區別:
1、work佇列模式不用定義交換機,而釋出/訂閱模式需要定義交換機。
2、釋出/訂閱模式的生產方是面向交換機發送訊息,work佇列模式的生產方是面向佇列傳送訊息(底層使用預設交換機)。
3、釋出/訂閱模式需要設定佇列和交換機的繫結,work佇列模式不需要設定,實際上work佇列模式會將佇列綁 定到預設的交換機
路由模式特點:
1.佇列與交換機的繫結,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
2.訊息的傳送方在 向 Exchange傳送訊息時,也必須指定訊息的 RoutingKey。
3.Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的Routingkey與訊息的 Routing key完全一致,才會接收到訊息
P:生產者,向Exchange傳送訊息,交換機繫結佇列的時候,會指定一個routing key,給交換機發送訊息的時候,也要帶著指定的routing key,並且有幾個routing key就傳送幾次(一次只能指定一個routing key)
X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列
C1:消費者,其所在佇列指定了需要routing key 為 error 的訊息
C2:消費者,其所在佇列指定了需要routing key 為 info、error、warning 的訊息
建立訊息生產者,程式碼如下:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重複程式碼擷取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //建立頻道(在本身和指定的rabbitmq伺服器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); /** * 宣告交換機 * 引數1:交換機名稱 * 引數2:交換機型別,fanout、topic、direct、headers(以下用DIRECT型別,路由模式,交換機發送的訊息會根routing key傳送給匹配的佇列) */ channel.exchangeDeclare("QY1", BuiltinExchangeType.DIRECT); //宣告佇列(說明要在rabbitmq伺服器中指定的虛擬主機中的哪條訊息佇列) /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi5",true,false,false,null); channel.queueDeclare("qianyi6",true,false,false,null); //佇列繫結交換機 //引數1:需要繫結的佇列 //引數2:需要繫結的交換機 //引數3:需要繫結的routing key(路由key)在交換機給佇列傳送訊息的時候,會根據它傳送 channel.queueBind("qianyi5","QY1","rouingkey1"); channel.queueBind("qianyi6","QY1","rouingkey2"); //建立訊息(中文會亂碼) String message1 = "釋出訂閱模式:歡迎光臨紅浪漫!111"; String message2 = "釋出訂閱模式:歡迎光臨紅浪漫!222"; //訊息傳送 /** * 訊息傳送 * 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage(不寫就填空串) * 引數2:路由key,簡單模式可以傳遞佇列名稱,廣播模式不傳遞佇列名稱 * 引數3:訊息其它屬性(沒有填null) * 引數4:訊息內容(訊息內容是字串,需要轉換成位元組陣列才能傳輸) */ channel.basicPublish("QY1", "rouingkey1", null, message1.getBytes()); channel.basicPublish("QY1", "rouingkey2", null, message2.getBytes()); //關閉資源(連線和頻道的) channel.close(); connection.close(); } }
消費者:
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //建立頻道(建立連線rabbitmq伺服器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明佇列,指定到哪個佇列獲取訊息 /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi6",true,false,false,null); //建立消費者,並設定訊息處理(DefaultConsumer:訊息消費者,引數傳入建立的頻道)然後再重寫handleDelivery方法,可以用lambab表示式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 訊息者標籤,在channel.basicConsume時候可以指定 * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送) * @param properties 屬性資訊 * @param body 訊息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機資訊 String exchange = envelope.getExchange(); //獲取訊息ID long deliveryTag = envelope.getDeliveryTag(); //獲取訊息資訊 String message = new String(body, "UTF-8"); //輸出獲得的訊息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //訊息監聽 /** * 訊息監聽 * 要監聽哪個佇列?當消費者收到訊息之後是否自動告訴rebbitmq伺服器已經收到?收到訊息之後,如何處理呢? * 引數1:佇列名稱 * 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認 * 引數3:訊息接收到後回撥(傳入上面建立的消費者物件,這個消費者物件中對做了對收到的訊息處理) */ channel.basicConsume("qianyi6",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽訊息) //channel.close(); //connection.close(); } }
總結:當在生產者中繫結交換機的時候,每繫結一個佇列,都會給該佇列指定一個routing key,然後在生產者向交換機發送訊息的時候,指定某個已經繫結的routing key,就會將該條訊息傳送到對應的佇列,比如A佇列綁定了routingkey1,生產者傳送訊息給佇列傳送訊息的時候,就會去佇列中找routingkey1的佇列,傳送過去,消費者只需要根據監聽的佇列就可以獲得該條訊息。
可以做到這樣:申明兩個佇列,分別指定routingkey,傳送訊息的時候也傳送兩條,一條訊息指定其中一個routingkey,也就將那條訊息傳送給了一個佇列,另一條訊息指定另一個routingkey,訊息也就傳送到了另一個佇列,兩個或者多個消費者只需要繫結不同的佇列就可以獲得兩個不同的訊息。
Routingkey
一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: qianyi.insert
萬用字元規則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
圖解:
-
紅色Queue:繫結的是
usa.#
,因此凡是以usa.
開頭的routing key
都會被匹配到 -
黃色Queue:繫結的是
#.news
,因此凡是以.news
結尾的routing key
都會被匹配
使用topic型別的Exchange,傳送訊息的routing key有3種: item.insert
、item.update
、item.delete
:
建立TopicProducer實現訊息生產,程式碼如下:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重複程式碼擷取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //建立頻道(在本身和指定的rabbitmq伺服器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); /** * 宣告交換機 * 引數1:交換機名稱 * 引數2:交換機型別,fanout、topic、direct、headers(以下用DIRECT型別,路由模式,交換機發送的訊息會根routing key傳送給匹配的佇列) */ channel.exchangeDeclare("QY2", BuiltinExchangeType.TOPIC); //宣告佇列(說明要在rabbitmq伺服器中指定的虛擬主機中的哪條訊息佇列) /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi7",true,false,false,null); channel.queueDeclare("qianyi8",true,false,false,null); channel.queueDeclare("qianyi9",true,false,false,null); //佇列繫結交換機 //引數1:需要繫結的佇列 //引數2:需要繫結的交換機 //引數3:需要繫結的routing key(路由key)在交換機給佇列傳送訊息的時候,會根據它傳送 // (*表示後面一個單詞無論是什麼,只要交換機發送訊息的routingkey的值是item開頭的,它都能接收到) //下面操作給qianyi7佇列兩個routing key,只要傳送的訊息指定其中任何一個routingkey,qianyi7都會接收到訊息 //給qianyi8佇列用了萬用字元,只要傳送的訊息指定的routing key是以item開頭的,它都能收到 channel.queueBind("qianyi7","QY2","item.inset"); channel.queueBind("qianyi7","QY2","item.update"); channel.queueBind("qianyi8","QY2","item.*"); //建立訊息(中文會亂碼) String message1 = "釋出訂閱模式:歡迎光臨紅浪漫!111"; String message2 = "釋出訂閱模式:歡迎光臨紅浪漫!222"; String message3 = "釋出訂閱模式:歡迎光臨紅浪漫!333"; //訊息傳送 /** * 訊息傳送 * 引數1:交換機名稱,如果沒有指定則使用預設Default Exchage(不寫就填空串) * 引數2:路由key,簡單模式可以傳遞佇列名稱,廣播模式不傳遞佇列名稱 * 引數3:訊息其它屬性(沒有填null) * 引數4:訊息內容(訊息內容是字串,需要轉換成位元組陣列才能傳輸) */ channel.basicPublish("QY2", "item.inset", null, message1.getBytes()); channel.basicPublish("QY2", "item.update", null, message2.getBytes()); //這裡routing key值寫item.aaa為了驗證只要是以item開頭的佇列,都可以接收到這條訊息 channel.basicPublish("QY2", "item.aaa", null, message3.getBytes()); //關閉資源(連線和頻道的) channel.close(); connection.close(); } }
消費者1:
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //建立頻道(建立連線rabbitmq伺服器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明佇列,指定到哪個佇列獲取訊息 /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi7",true,false,false,null); //建立消費者,並設定訊息處理(DefaultConsumer:訊息消費者,引數傳入建立的頻道)然後再重寫handleDelivery方法,可以用lambab表示式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 訊息者標籤,在channel.basicConsume時候可以指定 * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送) * @param properties 屬性資訊 * @param body 訊息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機資訊 String exchange = envelope.getExchange(); //獲取訊息ID long deliveryTag = envelope.getDeliveryTag(); //獲取訊息資訊 String message = new String(body, "UTF-8"); //輸出獲得的訊息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //訊息監聽 /** * 訊息監聽 * 要監聽哪個佇列?當消費者收到訊息之後是否自動告訴rebbitmq伺服器已經收到?收到訊息之後,如何處理呢? * 引數1:佇列名稱 * 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認 * 引數3:訊息接收到後回撥(傳入上面建立的消費者物件,這個消費者物件中對做了對收到的訊息處理) */ channel.basicConsume("qianyi7",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽訊息) //channel.close(); //connection.close(); } }
消費者2:
public class ConsumeTwo { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //建立頻道(建立連線rabbitmq伺服器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明佇列,指定到哪個佇列獲取訊息 /** * 宣告佇列 * 引數1:佇列名稱 * 引數2:是否定義持久化佇列 * 引數3:是否獨佔本次連線(其它連線是否能連線到本條佇列) * 引數4:是否在不使用的時候自動刪除佇列 * 引數5:佇列其它引數 * **/ channel.queueDeclare("qianyi8",true,false,false,null); //建立消費者,並設定訊息處理(DefaultConsumer:訊息消費者,引數傳入建立的頻道)然後再重寫handleDelivery方法,可以用lambab表示式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 訊息者標籤,在channel.basicConsume時候可以指定 * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送) * @param properties 屬性資訊 * @param body 訊息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機資訊 String exchange = envelope.getExchange(); //獲取訊息ID long deliveryTag = envelope.getDeliveryTag(); //獲取訊息資訊 String message = new String(body, "UTF-8"); //輸出獲得的訊息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //訊息監聽 /** * 訊息監聽 * 要監聽哪個佇列?當消費者收到訊息之後是否自動告訴rebbitmq伺服器已經收到?收到訊息之後,如何處理呢? * 引數1:佇列名稱 * 引數2:是否自動確認,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動確認 * 引數3:訊息接收到後回撥(傳入上面建立的消費者物件,這個消費者物件中對做了對收到的訊息處理) */ channel.basicConsume("qianyi8",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽訊息) //channel.close(); //connection.close(); } }
總結:以上程式碼驗證了:1、可以給一個佇列指定多個routing key,只要訊息傳送給多個routing key中的任何一個,該佇列都會收到訊息。
2、可以給routing key用萬用字元(*或者#)使用item.*,那麼只要傳送訊息的時候,指定的routing key是以item開頭都可以被該佇列收到。
所以,以上程式碼,qianyi7佇列收到了兩條訊息,一條是item.inset路由key接收的,一條是item.update路由key接收的,而qiani8則收到了3條資訊,因為每條資訊的路由key都符合item.*的規則。
模式總結
RabbitMQ工作模式: 1、簡單模式 HelloWorld 一個生產者、一個消費者,不需要設定交換機(使用預設的交換機)
2、工作佇列模式 Work Queue 一個生產者、多個消費者(競爭關係),不需要設定交換機(使用預設的交換機)
3、釋出訂閱模式 Publish/subscribe 需要設定型別為fanout的交換機,並且交換機和佇列進行繫結,當傳送訊息到交換機後,交換機會將訊息傳送到繫結的佇列
5、萬用字元模式 Topic 需要設定型別為topic的交換機,交換機和佇列進行繫結,並且指定萬用字元方式的routing key,當傳送訊息到交換機後,交換機會根據routing key將訊息傳送到對應的佇列
在Spring專案中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp
尤其是在spring boot專案中只需要引入對應的amqp啟動器依賴即可,方便的使用RabbitTemplate傳送訊息,使用註解接收訊息。
一般在開發過程中:
生產者工程:
-
application.yml檔案配置RabbitMQ相關資訊;
-
在生產者工程中編寫配置類,用於建立交換機和佇列,並進行繫結
-
注入RabbitTemplate物件,通過RabbitTemplate物件傳送訊息到交換機
消費者工程:
-
application.yml檔案配置RabbitMQ相關資訊
-
建立訊息處理類,用於接收佇列中的訊息並進行處理
建立生產者工程,新增依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.xxx</groupId> <artifactId>springboot-rabbitmq-producer</artifactId> <version>1.0-SNAPSHOT</version> <!--依賴--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
(1)application.yml配置檔案
建立application.yml,內容如下:
spring: rabbitmq: host: localhost port: 5672 virtual-host: /aaa #虛擬主機名 username: guest password: guest
繫結交換機和佇列
建立RabbitMQ佇列與交換機繫結的配置類RabbitMQConfig,程式碼如下:
@Configuration public class RabbitMQConfig { /*** * 宣告交換機 */ @Bean(name = "itemTopicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange("item_topic_exchange").durable(true).build(); } /*** * 宣告佇列 */ @Bean(name = "itemQueue") public Queue itemQueue(){ return QueueBuilder.durable("item_queue").build(); } /*** * 佇列繫結到交換機上 */ @Bean public Binding itemQueueExchange(@Qualifier("itemQueue")Queue queue, @Qualifier("itemTopicExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } }
搭建消費者工程
5.3.1. 建立工程
建立消費者工程,新增依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.xxx</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <!--依賴--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
啟動類:略
配置檔案與上相同,略
編寫訊息監聽器com.itheima.listener.MessageListener,程式碼如下:
@Component public class MessageListener { /** * 監聽某個佇列的訊息 * @param message 接收到的訊息 */ @RabbitListener(queues = "item_queue") public void myListener1(String message){ System.out.println("消費者接收到的訊息為:" + message); } }
+ confirm模式
生產者傳送訊息到交換機的時機
+ return模式
交換機轉發訊息給queue的時機
1.生產者傳送訊息到交換機
2.交換機根據routingkey 轉發訊息給佇列
3.消費者監控佇列,獲取佇列中資訊
4.消費成功刪除佇列中的訊息
實現:
先建立工程,新增依賴:(web依賴用於測試,test依賴無所謂,以及amqp的依賴,amqp是一種協議,裡面集成了rabbitMQ的相關需要依賴)
<parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.3.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2、建立啟動類,在啟動類裡面建立佇列、交換機、繫結的物件,並指定各自的名字(物件名隨意,可指定可不指定,下面程式碼指定了,在bean註解後面)
@SpringBootApplication public class RabbitMQDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitMQDemo01Application.class,args); } //建立佇列 @Bean(name = "queue_demo01") public Queue createQueue(){ //Q_demo01指定的是佇列名 return new Queue("Q_demo01"); } //建立交換機,直接使用directExchange(路由模式)它的父類也實現了Exchange介面 @Bean(name = "exchange_demo01") public DirectExchange createDirectExchange(){ //E_demo01指定的是交換機名 return new DirectExchange("E_demo01"); } //建立繫結物件,將交換機和佇列繫結 @Bean public Binding createBinDing(){ //將上面建立的佇列和交換機進行繫結,然後設定這個佇列接收哪些匹配的路由key:demo01 return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("demo01.01"); } }
3、建立配置檔案,主要配置該微服務的埠號、rabbitMQ伺服器的IP地址和埠號,以及rabbitMQ的使用者名稱和密碼、是否啟動confirm模式
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,預設關閉 server: port: 8881
4、建立controller模擬接收到前端資訊後,給rabbitMQ伺服器傳送訊息,這個訊息最終需要另一個微服務(消費者)接收,因為有confirm模式,所以在發訊息之前需要先設定回撥函式,當rabbitMQ中的交換機收到訊息就會呼叫的函式(方法)
@RestController @RequestMapping("/demo01") public class Demo01Controller { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallback confirmCallback; @RequestMapping("/test01") public String demo01(){ System.out.println("接收請求"); System.out.println("處理請求中……"); //設定回撥函式(當傳送訊息後,接收方會返回傳送發呼叫方法,這個方法呼叫就能知道訊息傳送結果) rabbitTemplate.setConfirmCallback(confirmCallback); //傳送訊息 rabbitTemplate.convertAndSend("E_demo01","demo01.01","demo01……"); return "ok"; } }
5、編寫這個回撥函式的具體方法,這個回撥函式實現rabbitTemplate.confarmCallback介面,重寫方法即可,具體實現看註釋(我這裡因為沒有下載原始碼,所以自動生成的程式碼,引數就變成了b、s這些不好讀,下載原始碼即可)
//需要交給spring核心容器管理 //回撥函式要實現rabbitTemplate中的confirmCallback介面 @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { /** * * @param correlationData 訊息資訊 * @param b 確認標識:true,MQ伺服器exchange表示已經確認收到訊息 false 表示沒有收到訊息 * @param s 如果沒有收到訊息,則指定為MQ伺服器exchange訊息沒有收到的原因,如果已經收到則指定為null */ if (b){ System.out.println("訊息收到,內容為:"+correlationData); }else { System.out.println("訊息未收到,原因為:"+s); } } }
然後就可以傳送請求到編寫的controller類了,controller類接收到請求後會發訊息到交換機上,交換機如果接收到訊息,就會呼叫回撥函式,如果傳送訊息時,故意填寫一個錯誤的交換機,並且這個錯誤的交換機是不存在的話,那麼當訊息一發送,沒有找到對應的交換機,在呼叫回撥函式的時候就會進入沒有接收訊息的判斷中,這樣就可以確定訊息到底有沒有傳送成功。
returncallback程式碼實現
如上,已經實現了訊息傳送到交換機上的內容,但是如果是,交換機發送成功,但是在路由轉發到佇列的時候,傳送錯誤,此時就需要用到returncallback模式了。接下來我們實現下。
實現步驟如下:
1.開啟returncallback模式
2.設定回撥函式
3.傳送訊息
配置yml開啟returncallback:在配置檔案中開啟:
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,預設關閉 publisher-returns: true #配置returns模式,預設關閉 server: port: 8881
編寫returns回撥函式:這個回撥函式跟confirm的回撥函式一脈相承,都差不多
@Component public class ReturnsCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { /** * * @param message 訊息資訊,因為message傳遞過來是位元組,所以需要轉換成字串 * @param i 退回的狀態碼 * @param s 退回的資訊 * @param s1 交換機 * @param s2 路由key */ System.out.println("退回的訊息是:"+new String(message.getBody())); System.out.println("退回的狀態碼是:"+i); System.out.println("退回的資訊是:"+s); System.out.println("退回的交換機是:"+s1); System.out.println("退回的路由key是:"+s2); } }
還需要在controller裡面加一行程式碼,就是傳送訊息之前設定returns的回撥函式:
@RestController @RequestMapping("/demo01") public class Demo01Controller { @Autowired private RabbitTemplate rabbitTemplate; //需要注入剛剛建立的confirm回撥函式 @Autowired private ConfirmCallback confirmCallback; //需要注入剛剛建立的returns回撥函式 @Autowired private ReturnsCallback returnsCallback; @RequestMapping("/test01") public String demo01(){ System.out.println("接收請求"); System.out.println("處理請求中……"); //設定confirm回撥函式(當傳送訊息後,接收方會返回傳送發呼叫方法,這個方法呼叫就能知道訊息傳送結果) rabbitTemplate.setConfirmCallback(confirmCallback); //設定returns的回撥函式(當交換機收到訊息傳送給佇列後,,佇列就會呼叫這個回撥函) rabbitTemplate.setReturnCallback(returnsCallback); //傳送訊息 rabbitTemplate.convertAndSend("E_demo01","demo01.01","demo01……"); return "ok"; } }
總結起來也就是新增加了三個步驟,第一個在配置檔案中開啟returnsCallback,第二個寫一個returnsCallback的回撥函式,第三個在傳送訊息之前指定好回撥函式,這樣就完成了從生產者發訊息到交換機,交換機會呼叫回撥函式,從交換機發訊息到佇列,佇列會呼叫回撥函式,確保了這三個點發送訊息不會有問題,唯一的小區別就是,交換機接收到訊息,判定是否接收到的條件可以是錯誤內容,如果錯誤內容為null則說明交換機接收到資訊且沒有異常,佇列是否接收到訊息的判斷條件可以是狀態碼,如下圖,當傳送訊息時指定不存在的routting key(路由key),那麼列印的訊息就會如下:
+ returncallback模式,需要手動設定開啟
+ 該模式 指定 在路由的時候傳送錯誤的時候呼叫回撥函式,不影響訊息傳送到交換機
兩種模式的總結
但是一般情況下我們使用confirm即可,因為路由key 由開發人員指定,一般不會出現錯誤,並且從交換機到佇列,都是在rabbitMQ伺服器中進行的,除非伺服器掛掉,否則不會出問題。如果要保證訊息在交換機和routingkey的時候那麼需要結合兩者的方式來進行設定。
上邊我們學習了傳送方的可靠性投遞,但是在消費方也有可能出現問題,比如沒有接受訊息,比如接受到訊息之後,在程式碼執行過程中出現了異常,這種情況下我們需要額外的處理,那麼就需要手動進行確認簽收訊息。rabbtimq給我們提供了一個機制:ACK機制。
ACK機制:有三種方式
-
-
手動確認 acknowledge="manual"
-
根據異常情況來確認 acknowledge="auto"
其中自動確認是指:
當訊息一旦被Consumer接收到,則自動確認收到,並將相應 message 從 RabbitMQ 的訊息快取中移除。但是在實際業務處理中,很可能訊息接收到,業務處理出現異常,那麼該訊息就會丟失。
其中手動確認方式是指:
則需要在業務處理成功後,呼叫channel.basicAck(),手動簽收,如果出現異常,則呼叫channel.basicNack()等方法,讓其按照業務功能進行處理,比如:重新發送,比如拒絕簽收進入死信佇列等等。
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,預設關閉 publisher-returns: true #配置returns模式,預設關閉 listener: simple: acknowledge-mode: manual #設定監聽端訊息ACK確認模式為手動模式,預設自動確認接收訊息,無論是否出異常 server: port: 8881
2.建立訊息監聽器監聽訊息:監聽佇列,接收訊息,然後用cry/catch來判定是否接收訊息,如果沒有異常,則接收訊息,並列印,如果有異常,則可以選擇將訊息返回給佇列或者丟棄訊息
@Component //指定需要監聽的佇列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 頻道物件 他提供了ack/nack方法(簽收和拒絕簽收) * Message 訊息本生的封裝的物件 * String msg 訊息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收訊息 System.out.println("消費者接收到的訊息:"+msg); try { //處理本地業務 System.out.println("處理本地業務開始======start======"); Thread.sleep(2000); //模擬接收訊息出錯 //int i = 1 / 0; //簽收訊息 // 引數1 指定的是訊息的序號(快遞號) // 引數2 指定是否需要批量的簽收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出現異常,則拒絕訊息 可以重回佇列 也可以丟棄 可以根據業務場景來 //方式一:可以批量處理y用:basicNack,傳三個引數 //引數3 標識是否重回佇列 true 是重回 false 就是不重回:丟棄訊息,如果重回佇列的話,異常沒有解決,就會進入死迴圈 //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //方式二:不批量處理:basicReject,傳兩個引數,第二個引數是否批量 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
按以上程式碼,如果消費端沒有出現異常,則會正常接收訊息,如果出現了異常,說明這個消費端的實施業務邏輯失敗,則必須告訴交換機,任務失敗,交易取消,可以選擇將訊息返回給交換機,或者丟棄這個訊息,返回給交換機,那麼訊息還會存放在交換機,但是交換機又會重新將返回的訊息傳送給消費端,消費的又出現異常,再返回給交換機,形成死迴圈。
以下為各種情況程式碼結果演示:
1、配置檔案開啟ACK手動確認模式,但是在消費端沒有寫程式碼確認接收,也沒有拒絕接收,消費端程式碼如下:
@Component @RabbitListener(queues = "queue_demo01") public class MyRabbitListener { /*@RabbitHandler public void msg(String message) { System.out.println("消費Duang接收訊息:" + message); }*/ @RabbitHandler public void msg(Message message, Channel channel ,String msg) { System.out.println("消費Duang接收訊息:" + msg); } }
那麼執行的結果就會是這樣:
說明一直沒有被簽收,訊息一直會在rabbitMQ伺服器
2、配置檔案開啟ACK手動確認模式,消費端出現異常,訊息接收被拒絕後執行丟棄訊息操作,消費端程式碼如下:
@Component //指定需要監聽的佇列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 頻道物件 他提供了ack/nack方法(簽收和拒絕簽收) * Message 訊息本生的封裝的物件 * String msg 訊息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收訊息 System.out.println("消費者接收到的訊息:"+msg); try { //處理本地業務 System.out.println("處理本地業務開始======start======"); Thread.sleep(2000); //模擬接收訊息出錯 int i = 1 / 0; //簽收訊息 // 引數1 指定的是訊息的序號(快遞號) // 引數2 指定是否需要批量的簽收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出現異常,則拒絕訊息 可以重回佇列 也可以丟棄 可以根據業務場景來 //方式一:可以批量處理y用:basicNack,傳三個引數 //引數3 標識是否重回佇列 true 是重回 false 就是不重回:丟棄訊息,如果重回佇列的話,異常沒有解決,就會進入死迴圈 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //方式二:不批量處理:basicReject,傳兩個引數,第二個引數是否批量 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
訊息丟棄後,則不再出現:
3、配置檔案開啟ACK手動確認模式,消費端出現異常,拒絕接收訊息,然後將訊息返回給佇列,程式碼如下(第三個引數設定為重回佇列進行再次投遞):
@Component //指定需要監聽的佇列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 頻道物件 他提供了ack/nack方法(簽收和拒絕簽收) * Message 訊息本生的封裝的物件 * String msg 訊息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收訊息 System.out.println("消費者接收到的訊息:"+msg); try { //處理本地業務 System.out.println("處理本地業務開始======start======"); Thread.sleep(2000); //模擬接收訊息出錯 int i = 1 / 0; //簽收訊息 // 引數1 指定的是訊息的序號(快遞號) // 引數2 指定是否需要批量的簽收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出現異常,則拒絕訊息 可以重回佇列 也可以丟棄 可以根據業務場景來 //方式一:可以批量處理y用:basicNack,傳三個引數 //引數3 標識是否重回佇列 true 是重回 false 就是不重回:丟棄訊息,如果重回佇列的話,異常沒有解決,就會進入死迴圈 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //方式二:不批量處理:basicReject,傳兩個引數,第二個引數是否批量 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
訊息返回佇列後,會再次給消費端投遞該訊息,異常不消失,死迴圈不停。
總結:
第一種:簽收
channel.basicAck()
第二種:拒絕簽收 批量處理
channel.basicNack()
第三種:拒絕簽收 不批量處理
channel.basicReject()
-
-
如果在消費端沒有出現異常,則呼叫channel.basicAck(deliveryTag,false);方法確認簽收訊息
-
如何保證訊息的高可靠性傳輸?
1.持久化(如果使用spring boot,則持久化的預設設定就是true,不需要額外進行設定)
• exchange要持久化
• queue要持久化
• message要持久化
2.生產方確認Confirm、Return
3.消費方確認Ack
如果併發量大的情況下,生產方不停的傳送訊息,可能處理不了那麼多訊息,此時訊息在佇列中堆積很多,當消費端啟動,瞬間就會湧入很多訊息,消費端有可能瞬間垮掉,這時我們可以在消費端進行限流操作,每秒鐘放行多少個訊息。這樣就可以進行併發量的控制,減輕系統的負載,提供系統的可用性,這種效果往往可以在秒殺和搶購中進行使用。在rabbitmq中也有限流的一些配置。
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,預設關閉 publisher-returns: true #配置returns模式,預設關閉 listener: simple: acknowledge-mode: manual #設定監聽端訊息ACK確認模式為手動模式 prefetch: 1 #設定每一個消費端,可以同時處理的未確認的訊息最大數量 server: port: 8881
這個限流預設是250個。
TTL
RabbitMQ設定過期時間有兩種:
-
針對某一個佇列設定過期時間 ;佇列中的所有訊息在過期時間到之後,如果沒有被消費則被全部清除
-
針對某一個特定的訊息設定過期時間;佇列中的訊息設定過期時間之後,如果這個訊息沒有被訊息則被清除。
需要注意一點的是:
針對某一個特定的訊息設定過期時間時,一定是訊息在佇列中在隊頭的時候進行計算,如果某一個訊息A 設定過期時間5秒,訊息B在隊頭,訊息B沒有設定過期時間,B此時過了已經5秒鐘了還沒被消費。注意,此時A訊息並不會被刪除,因為它並沒有再隊頭。
一般在工作當中,單獨使用TTL的情況較少。後面會講到延時佇列。在這裡有用處。
設定過期佇列,只需要在建立佇列的時候指定一下就可以了:
@SpringBootApplication public class RabbitMQDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitMQDemo01Application.class,args); } /*建立定時過期佇列,使用構建者模式,durable("Q_demo01"):設定佇列名 withArgument("x-message",100)第一個引數後面講,第二個是過期時間,單位毫秒*/ @Bean(name = "queue_demo01") public Queue createQueue(){ //Q_demo01指定的是佇列名 return QueueBuilder.durable("Q_demo01").withArgument("x-message",100).build(); } //建立交換機,直接使用directExchange(路由模式)它的父類也實現了Exchange介面 @Bean(name = "exchange_demo01") public DirectExchange createDirectExchange(){ //E_demo01指定的是交換機名 return new DirectExchange("E_demo01"); } //建立繫結物件,將交換機和佇列繫結 @Bean public Binding createBinDing(){ //將上面建立的佇列和交換機進行繫結,然後設定這個佇列接收哪些匹配的路由key:demo01 return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("demo01.01"); } }
死信佇列的介紹
如下圖的過程:
成為死信的三種條件:
-
佇列中訊息的長度(數量)到達限制;
-
消費者拒接消費訊息,basicNack/basicReject,並且不把訊息重新放入原目標佇列,requeue=false;(丟棄)
-
原佇列存在訊息過期設定,訊息到達超時時間未被消費;(ddl設定的過期的時間到了)
死信的處理過程
DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。
當這個佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange上去,進而被路由到另一個佇列。
可以監聽這個佇列中的訊息做相應的處理。(例如客戶下訂單,進入支付頁面,這個時候商品庫存已經在資料庫中進行減數操作,如果客戶突然不執行支付操作,那麼就可以設定定時訊息,如果超過時間,客戶沒有進行支付,則將這個死信訊息放入死信交換機,傳送給與私信交換機繫結的佇列中,用另一個消費端接收這個死信訊息,這個消費端就是執行將庫存數量重新加回來的操作)
死信佇列的設定
剛才說到死信佇列也是一個正常的exchange.只需要設定一些引數即可。
給佇列設定引數: x-dead-letter-exchange 和 x-dead-letter-routing-key。
如上圖所示
1.建立queue1 正常佇列 用於接收死信佇列過期之後轉發過來的訊息
2.建立queue2 可以針對他進行引數設定 死信佇列
3.建立交換機 死信交換機
4.繫結正常佇列到交換機