1. 程式人生 > 程式設計 >SpringBoot+RabbitMQ方式收發訊息的實現示例

SpringBoot+RabbitMQ方式收發訊息的實現示例

本篇會和SpringBoot做整合,採用自動配置的方式進行開發,我們只需要宣告RabbitMQ地址就可以了,關於各種建立連線關閉連線的事都由Spring幫我們了~

交給Spring幫我們管理連線可以讓我們專注於業務邏輯,就像宣告式事務一樣易用,方便又高效。

祝有好收穫,先贊後看,快樂無限。

本文程式碼:

https://gitee.com/he-erduo/spring-boot-learning-demo

https://github.com/he-erduo/spring-boot-learning-demo

1. 環境配置

第一節我們先來搞一下環境的配置,上一篇中我們已經引入了自動配置的包,我們既然使用了自動配置的方式,那RabbitMQ的連線資訊我們直接放在配置檔案中就行了,就像我們需要用到JDBC連線的時候去配置一下DataSource一樣。

SpringBoot+RabbitMQ方式收發訊息的實現示例

如圖所示,我們只需要指明一下連線的IP+埠號和使用者名稱密碼就行了,這裡我用的是預設的使用者名稱與密碼,不寫的話預設也都是guest,埠號也是預設5672。

主要我們需要看一下手動確認訊息的配置,需要配置成manual才是手動確認,日後還會有其他的配置項,眼下我們配置這一個就可以了。

接下來我們要配置一個Queue,上一篇中我們往一個名叫erduo的佇列中傳送訊息,當時是我們手動定義的此佇列,這裡我們也需要手動配置,宣告一個Bean就可以了。

@Configuration 
public class RabbitmqConfig { 
  @Bean 
  public Queue erduo() { 
    // 其三個引數:durable exclusive autoDelete 
    // 一般只設置一下持久化即可 
    return new Queue("erduo",true); 
  }  
} 

就這麼簡單宣告一下就可以了,當然了RabbitMQ畢竟是一個獨立的元件,如果你在RabbitMQ中通過其他方式已經建立過一個名叫erduo的隊列了,你這裡也可以不宣告,這裡起到的一個效果就是如果你沒有這個佇列,會按照你宣告的方式幫你建立這個佇列。

配置完環境之後,我們就可以以SpringBoot的方式來編寫生產者和消費者了。

2. 生產者與RabbitTemplate

和上一篇的節奏一樣,我們先來編寫生產者,不過這次我要引入一個新的工具:RabbitTemplate。

聽它的這個名字就知道,又是一個拿來即用的工具類,Spring家族這點就很舒服,什麼東西都給你封裝一遍,讓你用起來更方便更順手。

RabbitTemplate實現了標準AmqpTemplate介面,功能大致可以分為傳送訊息和接受訊息。

我們這裡是在生產者中來用,主要就是使用它的傳送訊息功能:send和convertAndSend方法。

// 傳送訊息到預設的Exchange,使用預設的routing key 
void send(Message message) throws AmqpException; 
// 使用指定的routing key傳送訊息到預設的exchange 
void send(String routingKey,Message message) throws AmqpException; 
// 使用指定的routing key傳送訊息到指定的exchange 
void send(String exchange,String routingKey,Message message) throws AmqpException; 

send方法是傳送byte陣列的資料的模式,這裡代表訊息內容的物件是Message物件,它的構造方法就是傳入byte陣列資料,所以我們需要把我們的資料轉成byte陣列然後構造成一個Message物件再進行傳送。

// Object型別,可以傳入POJO 
void convertAndSend(Object message) throws AmqpException; 
void convertAndSend(String routingKey,Object message) throws AmqpException; 
void convertAndSend(String exchange,Object message) throws AmqpException;

convertAndSend方法是可以傳入POJO物件作為引數,底層是有一個MessageConverter幫我們自動將資料轉換成byte型別或String或序列化型別。

所以這裡支援的傳入物件也只有三種:byte型別,String型別和實現了Serializable介面的POJO。

介紹完了,我們可以看一下程式碼:

@Slf4j 
@Component("rabbitProduce") 
public class RabbitProduce { 
  @Autowired 
  private RabbitTemplate rabbitTemplate; 
  public void send() { 
    String message = "Hello 我是作者和耳朵,歡迎關注我。" + LocalDateTime.now().toString(); 
    System.out.println("Message content : " + message); 
    // 指定訊息型別 
    MessageProperties props = MessagePropertiesBuilder.newInstance() 
        .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); 
    rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); 
    System.out.println("訊息傳送完畢。"); 
  } 
  public void convertAndSend() { 
    User user = new User(); 
    System.out.println("Message content : " + user); 
    rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); 
    System.out.println("訊息傳送完畢。"); 
  } 
} 

這裡我特意寫明瞭兩個例子,一個用來測試send,另一個用來測試convertAndSend。

send方法裡我們看下來和之前的程式碼是幾乎一樣的,定義一個訊息,然後直接send,但是這個構造訊息的構造方法可能比我們想的要多一個引數,我們原來說的只要把資料轉成二進位制陣列放進去即可,現在看來還要多放一個引數了。

MessageProperties,是的我們需要多放一個MessageProperties物件,從他的名字我們也可以看出它的功能就是附帶一些引數,但是某些引數是少不了的,不帶不行。

比如我的程式碼這裡就是設定了一下訊息的型別,訊息的型別有很多種可以是二進位制型別,文字型別,或者序列化型別,JSON型別,我這裡設定的就是文字型別,指定型別是必須的,也可以為我們拿到訊息之後要將訊息轉換成什麼樣的物件提供一個參考。

convertAndSend方法就要簡單太多,這裡我放了一個User物件拿來測試用,直接指定佇列然後放入這個物件即可。

Tips:User必須實現Serializable介面,不然的話呼叫此方法的時候會丟擲IllegalArgumentException異常。

程式碼完成之後我們就可以呼叫了,這裡我寫一個測試類進行呼叫:

@SpringBootTest 
public class RabbitProduceTest { 
  @Autowired 
  private RabbitProduce rabbitProduce; 
  @Test 
  public void sendSimpleMessage() { 
    rabbitProduce.send(); 
    rabbitProduce.convertAndSend(); 
  } 
} 

效果如下圖~

SpringBoot+RabbitMQ方式收發訊息的實現示例

同時在控制檯使用命令rabbitmqctl.bat list_queues檢視佇列-erduo現在的情況:

如此一來,我們的生產者測試就算完成了,現在訊息佇列裡兩條訊息了,而且訊息型別肯定不一樣,一個是我們設定的文字型別,一個是自動設定的序列化型別。

3. 消費者與RabbitListener

既然佇列裡面已經有訊息了,接下來我們就要看我們該如何通過新的方式拿到訊息並消費與確認了。

消費者這裡我們要用到@RabbitListener來幫我們拿到指定佇列訊息,它的用法很簡單也很複雜,我們可以先來說簡單的方式,直接放到方法上,指定監聽的佇列就行了。

@Slf4j 
@Component("rabbitConsumer") 
public class RabbitConsumer { 
  @RabbitListener(queues = Producer.QUEUE_NAME) 
  public void onMessage(Message message,Channel channel) throws Exception { 
    System.out.println("Message content : " + message); 
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 
    System.out.println("訊息已確認"); 
  } 
} 

這段程式碼就代表onMessage方法會處理erduo(Producer.QUEUE_NAME是常量字串"erduo")佇列中的訊息。

我們可以看到這個方法裡面有兩個引數,Message和Channel,如果用不到Channel可以不寫此引數,但是Message訊息一定是要的,它代表了訊息本身。

我們可以想想,我們的程式從RabbitMQ之中拉回一條條訊息之後,要以怎麼樣的方式展示給我們呢?

沒錯,就是封裝為一個個Message物件,這裡面放入了一條訊息的所有資訊,資料結構是什麼樣一會我一run你就能看到了。

同時這裡我們使用Channel做一個訊息確認的操作,這裡的DeliveryTag代表的是這個訊息在佇列中的序號,這個資訊存放在MessageProperties中。

4. SpringBoot 啟動!

編寫完生產者和消費者,同時已經執行過生產者往訊息佇列裡面放了兩條資訊,接下來我們可以直接啟動訊息,檢視消費情況:

SpringBoot+RabbitMQ方式收發訊息的實現示例

在我紅色框線標記的地方可以看到,因為我們有了消費者所以專案啟動後先和RabbitMQ建立了一個連線進行監聽佇列。

隨後就開始消費我們佇列中的兩條訊息:

第一條資訊是contentType=text/plain型別,所以直接就在控制檯上打印出了具體內容。

第二條資訊是contentType=application/x-java-serialized-object,在列印的時候只打印了一個記憶體地址+位元組大小。

不管怎麼說,資料我們是拿到了,也就是代表我們的消費是沒有問題的,同時也都進行了訊息確認操作,從資料上看,整個訊息可以分為兩部分:body和MessageProperties。

我們可以單獨使用一個註解拿到這個body的內容 - @Payload

@RabbitListener(queues = Producer.QUEUE_NAME) 
public void onMessage(@Payload String body,Channel channel) throws Exception { 
  System.out.println("Message content : " + body); 
 } 

也可以單獨使用一個註解拿到MessageProperties的headers屬性,headers屬性在截圖裡也可以看到,只不過是個空的 - @Headers。

@RabbitListener(queues = Producer.QUEUE_NAME) 
public void onMessage(@Payload String body,@Headers Map<String,Object> headers) throws Exception { 
  System.out.println("Message content : " + body); 
  System.out.println("Message headers : " + headers); 
} 

這兩個註解都算是擴充套件知識,我還是更喜歡直接拿到全部,全都要!!!

上面我們已經完成了訊息的傳送與消費,整個過程我們可以再次回想一下,一切都和我畫的這張圖上一樣的軌跡:

SpringBoot+RabbitMQ方式收發訊息的實現示例

只不過我們一直沒有指定Exchage一直使用的預設路由,希望大家好好記住這張圖。

5. @RabbitListener與@RabbitHandler

下面再來補一些知識點,有關@RabbitListener與@RabbitHandler。

@RabbitListener上面我們已經簡單的進行了使用,稍微擴充套件一下它其實是可以監聽多個佇列的,就像這樣:

@RabbitListener(queues = { "queue1","queue2" }) 
public void onMessage(Message message,Channel channel) throws Exception { 
  System.out.println("Message content : " + message); 
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false) 
   System.out.println("訊息已確認"); 
} 

還有一些其他的特性如繫結之類的,這裡不再贅述因為太硬編碼了一般用不上。

下面來說說這節要主要講的一個特性:@RabbitListener和@RabbitHandler的搭配使用。

前面我們沒有提到,@RabbitListener註解其實是可以註解在類上的,這個註解在類上標誌著這個類監聽某個佇列或某些佇列。

這兩個註解的搭配使用就要讓@RabbitListener註解在類上,然後用@RabbitHandler註解在方法上,根據方法引數的不同自動識別並去消費,寫個例子給大家看一看更直觀一些。

@Slf4j 
@Component("rabbitConsumer") 
@RabbitListener(queues = Producer.QUEUE_NAME) 
public class RabbitConsumer { 
  @RabbitHandler 
  public void onMessage(@Payload String message){ 
    System.out.println("Message content : " + message); 
  } 
  @RabbitHandler 
  public void onMessage(@Payload User user) { 
    System.out.println("Message content : " + user); 
  } 
} 

大家可以看看這個例子,我們先用@RabbitListener監聽erduo佇列中的訊息,然後使用@RabbitHandler註解了兩個方法。

第一個方法的body型別是String型別,這就代表著這個方法只能處理文字型別的訊息。 第二個方法的body型別是User型別,這就代表著這個方法只能處理序列化型別且為User型別的訊息。

這兩個方法正好對應著我們第二節中測試類會發送的兩種訊息,所以我們往RabbitMQ中傳送兩條測試訊息,用來測試這段程式碼,看看效果:

SpringBoot+RabbitMQ方式收發訊息的實現示例

都在控制檯上如常列印了,如果@RabbitHandler註解的方法中沒有一個的型別可以和你訊息的型別對的上,比如訊息都是byte陣列型別,這裡沒有對應的方法去接收,系統就會在控制檯不斷的報錯,如果你出現這個情況就證明你型別寫的不正確。

假設你的erduo佇列中會出現三種類型的訊息:byte,文字和序列化,那你就必須要有對應的處理這三種訊息的方法,不然訊息發過來的時候就會因為無法正確轉換而報錯。

而且使用了@RabbitHandler註解之後就不能再和之前一樣使用Message做接收型別。

@RabbitHandler 
public void onMessage(Message message,false); 
  System.out.println("訊息已確認"); 
} 

這樣寫的話會報型別轉換異常的,所以二者選其一。

同時上文我的@RabbitHandler沒有進行訊息確認,大家可以自己試一下進行訊息確認。

6. 訊息的序列化轉換

通過上文我們已經知道,能被自動轉換的物件只有byte[]、String、java序列化物件(實現了Serializable介面的物件),但是並不是所有的Java物件都會去實現Serializable介面,而且序列化的過程中使用的是JDK自帶的序列化方法,效率低下。

所以我們更普遍的做法是:使用Jackson先將資料轉換成JSON格式傳送給RabbitMQ,再接收訊息的時候再用Jackson將資料反序列化出來。

這樣做可以完美解決上面的痛點:訊息物件既不必再去實現Serializable介面,也有比較高的效率(Jackson序列化效率業界應該是最好的了)。

預設的訊息轉換方案是訊息轉換頂層介面-MessageConverter的一個子類:SimpleMessageConverter,我們如果要換到另一個訊息轉換器只需要替換掉這個轉換器就行了。

SpringBoot+RabbitMQ方式收發訊息的實現示例

上圖是MessageConverter結構樹的結構樹,可以看到除了SimpleMessageConverter之外還有一個Jackson2JsonMessageConverter,我們只需要將它定義為Bean,就可以直接使用這個轉換器了。

@Bean 
  public MessageConverter jackson2JsonMessageConverter() { 
    return new Jackson2JsonMessageConverter(jacksonObjectMapper); 
  } 

這樣就可以了,這裡的jacksonObjectMapper可以不傳入,但是預設的ObjectMapper方案對JDK8的時間日期序列化會不太友好,具體可以參考我的上一篇文章:從LocalDateTime序列化探討全域性一致性序列化,總的來說就是定義了自己的ObjectMapper。

同時為了接下來測試方便,我又定義了一個專門測試JSON序列化的佇列:

@Bean 
public Queue erduoJson() { 
  // 其三個引數:durable exclusive autoDelete 
  // 一般只設置一下持久化即可 
  return new Queue("erduo_json",true); 
} 

如此之後就可以進行測試了,先是生產者程式碼:

public void sendObject() { 
    Client client = new Client(); 
    System.out.println("Message content : " + client); 
    rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); 
    System.out.println("訊息傳送完畢。"); 
   } 

我又重新定義了一個Client物件,它和之前測試使用的User物件成員變數都是一樣的,不一樣的是它沒有實現Serializable介面。

同時為了保留之前的測試程式碼,我又新建了一個RabbitJsonConsumer,用於測試JSON序列化的相關消費程式碼,裡面定義了一個靜態變數:JSON_QUEUE = "erduo_json";

所以這段程式碼是將Client物件作為訊息傳送到"erduo_json"佇列中去,隨後我們在測試類中run一下進行一次傳送。

緊著是消費者程式碼:

@Slf4j 
@Component("rabbitJsonConsumer") 
@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE) 
public class RabbitJsonConsumer { 
  public static final String JSON_QUEUE = "erduo_json"; 
  @RabbitHandler 
  public void onMessage(Client client,Object> headers,Channel channel) throws Exception { 
    System.out.println("Message content : " + client); 
    System.out.println("Message headers : " + headers); 
    channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); 
    System.out.println("訊息已確認"); 
  } 
} 

有了上文的經驗之後,這段程式碼理解起來也是很簡單了吧,同時給出了上一節沒寫的如何在@RabbitHandler模式下進行訊息簽收。

我們直接來看看效果:

SpringBoot+RabbitMQ方式收發訊息的實現示例

SpringBoot+RabbitMQ方式收發訊息的實現示例

在列印的Headers裡面,往後翻可以看到contentType=application/json,這個contentType是表明了訊息的型別,這裡正是說明我們新的訊息轉換器生效了,將所有訊息都轉換成了JSON型別。

後記

這兩篇講完了RabbitMQ的基本收發訊息,包括手動配置和自動配置的兩種方式,這些大家仔細研讀之後應該會對RabbitMQ收發訊息沒什麼疑問了~

不過我們一直以來發訊息時都是使用預設的交換機,下篇將會講述一下RabbitMQ的幾種交換機型別,以及其使用方式。

到此這篇關於SpringBoot+RabbitMQ方式收發訊息的實現示例的文章就介紹到這了,更多相關SpringBoot RabbitMQ 收發訊息內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!