1. 程式人生 > 程式設計 >Springboot 整合RabbitMq(用心看完這一篇就夠了)

Springboot 整合RabbitMq(用心看完這一篇就夠了)

該篇文章內容較多,包括有rabbitMq相關的一些簡單理論介紹,provider訊息推送例項,consumer訊息消費例項,Direct、Topic、Fanout的使用,訊息回撥、手動確認等。 (但是關於rabbitMq的安裝,就不介紹了)

在安裝完rabbitMq後,輸入http://ip:15672/ ,是可以看到一個簡單後臺管理介面的。

Springboot 整合RabbitMq(用心看完這一篇就夠了)

在這個介面裡面我們可以做些什麼?

可以手動建立虛擬host,建立使用者,分配許可權,建立交換機,建立佇列等等,還有檢視佇列訊息,消費效率,推送效率等等。

以上這些管理介面的操作在這篇暫時不做擴充套件描述,我想著重介紹後面例項裡會使用到的。

首先先介紹一個簡單的一個訊息推送到接收的流程,提供一個簡單的圖:

JCccc-RabbitMq

RabbitMq -JCccc

黃色的圈圈就是我們的訊息推送服務,將訊息推送到 中間方框裡面也就是 rabbitMq的伺服器,然後經過伺服器裡面的交換機、佇列等各種關係(後面會詳細講)將資料處理入列後,最終右邊的藍色圈圈消費者獲取對應監聽的訊息。

常用的交換機有以下三種,因為消費者是從佇列獲取資訊的,佇列是繫結交換機的(一般),所以對應的訊息推送/接收模式也會有以下幾種:

Direct Exchange

直連型交換機,根據訊息攜帶的路由鍵將訊息投遞給對應佇列。

大致流程,有一個佇列繫結到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然後當一個訊息攜帶著路由值為X,這個訊息通過生產者傳送給交換機時,交換機就會根據這個路由值X去尋找繫結值也是X的佇列。

Fanout Exchange

扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到訊息後,會直接轉發到繫結到它上面的所有佇列。

Topic Exchange

主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和繫結鍵之間是有規則的。
簡單地介紹下規則:

* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞
通配的繫結鍵是跟佇列進行繫結的,舉個小例子
佇列Q1 繫結鍵為 *.TT.* 佇列Q2繫結鍵為 TT.#
如果一條訊息攜帶的路由鍵為 A.TT.B,那麼佇列Q1將會收到;
如果一條訊息攜帶的路由鍵為TT.AA.BB,那麼佇列Q2將會收到;

主題交換機是非常強大的,為啥這麼膨脹?
當一個佇列的繫結鍵為 "#"(井號) 的時候,這個佇列將會無視訊息的路由鍵,接收所有的訊息。
當 * (星號) 和 # (井號) 這兩個特殊字元都未在繫結鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。

另外還有 Header Exchange 頭交換機 ,Default Exchange 預設交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述。

好了,一些簡單的介紹到這裡為止, 接下來我們來一起編碼。

本次例項教程需要建立2個springboot專案,一個 rabbitmq-provider (生產者),一個rabbitmq-consumer(消費者)。

首先建立 rabbitmq-provider,

pom.xml裡用到的jar依賴:

    <!--rabbitmq-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

然後application.yml:

ps:裡面的虛擬host配置項不是必須的,我自己在rabbitmq服務上建立了自己的虛擬host,所以我配置了;你們不建立,就不用加這個配置項。

server:
 port: 8021
spring:
 #給專案來個名字
 application:
  name: rabbitmq-provider
 #配置rabbitMq 伺服器
 rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: root
  password: root
  #虛擬host 可以不設定,使用server預設host
  virtual-host: JCcccHost

接著我們先使用下direct exchange(直連型交換機),建立DirectRabbitConfig.java(對於佇列和交換機持久化以及連線使用設定,在註釋裡有說明,後面的不同交換機的配置就不做同樣說明了):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {
 
  //佇列 起名:TestDirectQueue
  @Bean
  public Queue TestDirectQueue() {
    // durable:是否持久化,預設是false,持久化佇列:會被儲存在磁碟上,當訊息代理重啟時仍然存在,暫存佇列:當前連線有效
    // exclusive:預設也是false,只能被當前建立的連線使用,而且當連線關閉後佇列即被刪除。此參考優先順序高於durable
    // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此佇列,該佇列會自動刪除。
    //  return new Queue("TestDirectQueue",true,false);
 
    //一般設定一下佇列的持久化就好,其餘兩個就是預設false
    return new Queue("TestDirectQueue",true);
  }
 
  //Direct交換機 起名:TestDirectExchange
  @Bean
  DirectExchange TestDirectExchange() {
   // return new DirectExchange("TestDirectExchange",true);
    return new DirectExchange("TestDirectExchange",false);
  }
 
  //繫結 將佇列和交換機繫結,並設定用於匹配鍵:TestDirectRouting
  @Bean
  Binding bindingDirect() {
    return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  }
 
  @Bean
  DirectExchange lonelyDirectExchange() {
    return new DirectExchange("lonelyDirectExchange");
  }
}

然後寫個簡單的介面進行訊息推送(根據需求也可以改為定時任務等等,具體看需求),SendMessageController.java:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@RestController
public class SendMessageController {
 
  @Autowired
  RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/傳送等等方法
 
  @GetMapping("/sendDirectMessage")
  public String sendDirectMessage() {
    String messageId = String.valueOf(UUID.randomUUID());
    String messageData = "test message,hello!";
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    Map<String,Object> map=new HashMap<>();
    map.put("messageId",messageId);
    map.put("messageData",messageData);
    map.put("createTime",createTime);
    //將訊息攜帶繫結鍵值:TestDirectRouting 傳送到交換機TestDirectExchange
    rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting",map);
    return "ok";
  } 
}

把rabbitmq-provider專案執行,呼叫下介面:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

因為我們目前還沒弄消費者 rabbitmq-consumer,訊息沒有被消費的,我們去rabbitMq管理頁面看看,是否推送成功:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

再看看佇列(介面上的各個英文項代表什麼意思,可以自己查查哈,對理解還是有幫助的):

Springboot 整合RabbitMq(用心看完這一篇就夠了)

很好,訊息已經推送到rabbitMq伺服器上面了。

接下來,建立rabbitmq-consumer專案:

pom.xml裡的jar依賴:

    <!--rabbitmq-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>

然後是 application.yml:

server:
 port: 8022
spring:
 #給專案來個名字
 application:
  name: rabbitmq-consumer
 #配置rabbitMq 伺服器
 rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: root
  password: root
  #虛擬host 可以不設定,使用server預設host
  virtual-host: JCcccHost

然後一樣,建立DirectRabbitConfig.java(消費者單純的使用,其實可以不用新增這個配置,直接建後面的監聽就好,使用註解來讓監聽器監聽對應的佇列即可。配置上了的話,其實消費者也是生成者的身份,也能推送該訊息。):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {
 
  //佇列 起名:TestDirectQueue
  @Bean
  public Queue TestDirectQueue() {
    return new Queue("TestDirectQueue",true);
  }
 
  //Direct交換機 起名:TestDirectExchange
  @Bean
  DirectExchange TestDirectExchange() {
    return new DirectExchange("TestDirectExchange");
  }
 
  //繫結 將佇列和交換機繫結,並設定用於匹配鍵:TestDirectRouting
  @Bean
  Binding bindingDirect() {
    return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  }
}

然後是建立訊息接收監聽類,DirectReceiver.java:

@Component
@RabbitListener(queues = "TestDirectQueue")//監聽的佇列名稱 TestDirectQueue
public class DirectReceiver {
 
  @RabbitHandler
  public void process(Map testMessage) {
    System.out.println("DirectReceiver消費者收到訊息 : " + testMessage.toString());
  }
 
}

然後將rabbitmq-consumer專案執行起來,可以看到把之前推送的那條訊息消費下來了:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

然後可以再繼續呼叫rabbitmq-provider專案的推送訊息介面,可以看到消費者即時消費訊息:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

那麼直連交換機既然是一對一,那如果咱們配置多臺監聽繫結到同一個直連互動的同一個佇列,會怎麼樣?

Springboot 整合RabbitMq(用心看完這一篇就夠了)

可以看到是實現了輪詢的方式對訊息進行消費,而且不存在重複消費。

接著,我們使用Topic Exchange 主題交換機。

在rabbitmq-provider專案裡面建立TopicRabbitConfig.java:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
 
@Configuration
public class TopicRabbitConfig {
  //繫結鍵
  public final static String man = "topic.man";
  public final static String woman = "topic.woman";
 
  @Bean
  public Queue firstQueue() {
    return new Queue(TopicRabbitConfig.man);
  }
 
  @Bean
  public Queue secondQueue() {
    return new Queue(TopicRabbitConfig.woman);
  }
 
  @Bean
  TopicExchange exchange() {
    return new TopicExchange("topicExchange");
  }
 
  //將firstQueue和topicExchange繫結,而且繫結的鍵值為topic.man
  //這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列
  @Bean
  Binding bindingExchangeMessage() {
    return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
  }
 
  //將secondQueue和topicExchange繫結,而且繫結的鍵值為用上通配路由鍵規則topic.#
  // 這樣只要是訊息攜帶的路由鍵是以topic.開頭,都會分發到該佇列
  @Bean
  Binding bindingExchangeMessage2() {
    return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
  }
}

然後新增多2個介面,用於推送訊息到主題交換機:

 @GetMapping("/sendTopicMessage1")
  public String sendTopicMessage1() {
    String messageId = String.valueOf(UUID.randomUUID());
    String messageData = "message: M A N ";
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    Map<String,Object> manMap = new HashMap<>();
    manMap.put("messageId",messageId);
    manMap.put("messageData",messageData);
    manMap.put("createTime",createTime);
    rabbitTemplate.convertAndSend("topicExchange","topic.man",manMap);
    return "ok";
  }
 
  @GetMapping("/sendTopicMessage2")
  public String sendTopicMessage2() {
    String messageId = String.valueOf(UUID.randomUUID());
    String messageData = "message: woman is all ";
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    Map<String,Object> womanMap = new HashMap<>();
    womanMap.put("messageId",messageId);
    womanMap.put("messageData",messageData);
    womanMap.put("createTime","topic.woman",womanMap);
    return "ok";
  }
}

生產者這邊已經完事,先不急著執行,在rabbitmq-consumer專案上,建立TopicManReceiver.java:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
 
  @RabbitHandler
  public void process(Map testMessage) {
    System.out.println("TopicManReceiver消費者收到訊息 : " + testMessage.toString());
  }
}

再建立一個TopicTotalReceiver.java:

package com.elegant.rabbitmqconsumer.receiver;
 
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
 
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
 
  @RabbitHandler
  public void process(Map testMessage) {
    System.out.println("TopicTotalReceiver消費者收到訊息 : " + testMessage.toString());
  }
}

同樣,加主題交換機的相關配置,TopicRabbitConfig.java(消費者一定要加這個配置嗎? 不需要的其實,理由在前面已經說過了。):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
 
@Configuration
public class TopicRabbitConfig {
  //繫結鍵
  public final static String man = "topic.man";
  public final static String woman = "topic.woman";
 
  @Bean
  public Queue firstQueue() {
    return new Queue(TopicRabbitConfig.man);
  }
 
  @Bean
  public Queue secondQueue() {
    return new Queue(TopicRabbitConfig.woman);
  }
 
  @Bean
  TopicExchange exchange() {
    return new TopicExchange("topicExchange");
  }
 
 
  //將firstQueue和topicExchange繫結,都會分發到該佇列
  @Bean
  Binding bindingExchangeMessage2() {
    return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
  }
 
}

然後把rabbitmq-provider,rabbitmq-consumer兩個專案都跑起來,先呼叫/sendTopicMessage1 介面:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

然後看消費者rabbitmq-consumer的控制檯輸出情況:
TopicManReceiver監聽佇列1,繫結鍵為:topic.man
TopicTotalReceiver監聽佇列2,繫結鍵為:topic.#
而當前推送的訊息,攜帶的路由鍵為:topic.man

所以可以看到兩個監聽消費者receiver都成功消費到了訊息,因為這兩個recevier監聽的佇列的繫結鍵都能與這條訊息攜帶的路由鍵匹配上。

Springboot 整合RabbitMq(用心看完這一篇就夠了)

接下來呼叫介面/sendTopicMessage2:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

然後看消費者rabbitmq-consumer的控制檯輸出情況:
TopicManReceiver監聽佇列1,繫結鍵為:topic.man
TopicTotalReceiver監聽佇列2,繫結鍵為:topic.#
而當前推送的訊息,攜帶的路由鍵為:topic.woman

所以可以看到兩個監聽消費者只有TopicTotalReceiver成功消費到了訊息。

Springboot 整合RabbitMq(用心看完這一篇就夠了)

接下來是使用Fanout Exchang 扇型交換機。

同樣地,先在rabbitmq-provider專案上建立FanoutRabbitConfig.java:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
 
@Configuration
public class FanoutRabbitConfig {
 
  /**
   * 建立三個佇列 :fanout.A  fanout.B fanout.C
   * 將三個佇列都繫結在交換機 fanoutExchange 上
   * 因為是扇型交換機,路由鍵無需配置,配置也不起作用
   */
 
 
  @Bean
  public Queue queueA() {
    return new Queue("fanout.A");
  }
 
  @Bean
  public Queue queueB() {
    return new Queue("fanout.B");
  }
 
  @Bean
  public Queue queueC() {
    return new Queue("fanout.C");
  }
 
  @Bean
  FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
  }
 
  @Bean
  Binding bindingExchangeA() {
    return BindingBuilder.bind(queueA()).to(fanoutExchange());
  }
 
  @Bean
  Binding bindingExchangeB() {
    return BindingBuilder.bind(queueB()).to(fanoutExchange());
  }
 
  @Bean
  Binding bindingExchangeC() {
    return BindingBuilder.bind(queueC()).to(fanoutExchange());
  }
}

然後是寫一個介面用於推送訊息,

 @GetMapping("/sendFanoutMessage")
  public String sendFanoutMessage() {
    String messageId = String.valueOf(UUID.randomUUID());
    String messageData = "message: testFanoutMessage ";
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    Map<String,Object> map = new HashMap<>();
    map.put("messageId",createTime);
    rabbitTemplate.convertAndSend("fanoutExchange",null,map);
    return "ok";
  }

接著在rabbitmq-consumer專案里加上訊息消費類,

FanoutReceiverA.java:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
 
  @RabbitHandler
  public void process(Map testMessage) {
    System.out.println("FanoutReceiverA消費者收到訊息 : " +testMessage.toString());
  }
}

FanoutReceiverB.java:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
 
  @RabbitHandler
  public void process(Map testMessage) {
    System.out.println("FanoutReceiverB消費者收到訊息 : " +testMessage.toString());
  }
}

FanoutReceiverC.java:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
 
  @RabbitHandler
  public void process(Map testMessage) {
    System.out.println("FanoutReceiverC消費者收到訊息 : " +testMessage.toString());
  } 
}

然後加上扇型交換機的配置類,FanoutRabbitConfig.java(消費者真的要加這個配置嗎? 不需要的其實,理由在前面已經說過了):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class FanoutRabbitConfig {
 
  /**
   * 建立三個佇列 :fanout.A  fanout.B fanout.C
   * 將三個佇列都繫結在交換機 fanoutExchange 上
   * 因為是扇型交換機,配置也不起作用
   */
 
 
  @Bean
  public Queue queueA() {
    return new Queue("fanout.A");
  }
 
  @Bean
  public Queue queueB() {
    return new Queue("fanout.B");
  }
 
  @Bean
  public Queue queueC() {
    return new Queue("fanout.C");
  }
 
  @Bean
  FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
  }
 
  @Bean
  Binding bindingExchangeA() {
    return BindingBuilder.bind(queueA()).to(fanoutExchange());
  }
 
  @Bean
  Binding bindingExchangeB() {
    return BindingBuilder.bind(queueB()).to(fanoutExchange());
  }
 
  @Bean
  Binding bindingExchangeC() {
    return BindingBuilder.bind(queueC()).to(fanoutExchange());
  }
}

最後將rabbitmq-provider和rabbitmq-consumer專案都跑起來,呼叫下介面/sendFanoutMessage :

Springboot 整合RabbitMq(用心看完這一篇就夠了)

然後看看rabbitmq-consumer專案的控制檯情況:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

可以看到只要傳送到 fanoutExchange 這個扇型交換機的訊息, 三個佇列都繫結這個交換機,所以三個訊息接收類都監聽到了這條訊息。

到了這裡其實三個常用的交換機的使用我們已經完畢了,那麼接下來我們繼續講講訊息的回撥,其實就是訊息確認(生產者推送訊息成功,消費者接收訊息成功)。

在rabbitmq-provider專案的application.yml檔案上,加上訊息確認的配置項後:

ps: 本篇文章使用springboot版本為 2.1.7.RELEASE ;
如果你們在配置確認回撥,測試發現無法觸發回撥函式,那麼存在原因也許是因為版本導致的配置項不起效,
可以把publisher-confirms: true 替換為 publisher-confirm-type: correlated

server:
 port: 8021
spring:
 #給專案來個名字
 application:
  name: rabbitmq-provider
 #配置rabbitMq 伺服器
 rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: root
  password: root
  #虛擬host 可以不設定,使用server預設host
  virtual-host: JCcccHost
  #訊息確認配置項
 
  #確認訊息已傳送到交換機(Exchange)
  publisher-confirms: true
  #確認訊息已傳送到佇列(Queue)
  publisher-returns: true

然後是配置相關的訊息確認回撥函式,RabbitConfig.java:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class RabbitConfig {
 
  @Bean
  public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //設定開啟Mandatory,才能觸發回撥函式,無論訊息推送結果怎麼樣都強制呼叫回撥函式
    rabbitTemplate.setMandatory(true);
 
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      @Override
      public void confirm(CorrelationData correlationData,boolean ack,String cause) {
        System.out.println("ConfirmCallback:   "+"相關資料:"+correlationData);
        System.out.println("ConfirmCallback:   "+"確認情況:"+ack);
        System.out.println("ConfirmCallback:   "+"原因:"+cause);
      }
    });
 
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
      @Override
      public void returnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey) {
        System.out.println("ReturnCallback:   "+"訊息:"+message);
        System.out.println("ReturnCallback:   "+"迴應碼:"+replyCode);
        System.out.println("ReturnCallback:   "+"迴應資訊:"+replyText);
        System.out.println("ReturnCallback:   "+"交換機:"+exchange);
        System.out.println("ReturnCallback:   "+"路由鍵:"+routingKey);
      }
    });
 
    return rabbitTemplate;
  }
 
}

到這裡,生產者推送訊息的訊息確認呼叫回撥函式已經完畢。
可以看到上面寫了兩個回撥函式,一個叫 ConfirmCallback ,一個叫 RetrunCallback;
那麼以上這兩種回撥函式都是在什麼情況會觸發呢?

先從總體的情況分析,推送訊息存在四種情況:

①訊息推送到server,但是在server裡找不到交換機
②訊息推送到server,找到交換機了,但是沒找到佇列
③訊息推送到sever,交換機和佇列啥都沒找到
④訊息推送成功

那麼我先寫幾個介面來分別測試和認證下以上4種情況,訊息確認觸發回撥函式的情況:

①訊息推送到server,但是在server裡找不到交換機
寫個測試介面,把訊息推送到名為‘non-existent-exchange'的交換機上(這個交換機是沒有建立沒有配置的):

 @GetMapping("/TestMessageAck")
  public String TestMessageAck() {
    String messageId = String.valueOf(UUID.randomUUID());
    String messageData = "message: non-existent-exchange test message ";
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    Map<String,createTime);
    rabbitTemplate.convertAndSend("non-existent-exchange",map);
    return "ok";
  }

呼叫介面,檢視rabbitmq-provuder專案的控制檯輸出情況(原因裡面有說,沒有找到交換機'non-existent-exchange'):

2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost',class-id=60,method-id=40)
ConfirmCallback: 相關資料:null
ConfirmCallback: 確認情況:false
ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404,method-id=40)

結論: ①這種情況觸發的是 ConfirmCallback 回撥函式。

②訊息推送到server,找到交換機了,但是沒找到佇列
這種情況就是需要新增一個交換機,但是不給這個交換機繫結佇列,我來簡單地在DirectRabitConfig裡面新增一個直連交換機,名叫‘lonelyDirectExchange',但沒給它做任何繫結配置操作:

@Bean
  DirectExchange lonelyDirectExchange() {
    return new DirectExchange("lonelyDirectExchange");
  }

然後寫個測試介面,把訊息推送到名為‘lonelyDirectExchange'的交換機上(這個交換機是沒有任何佇列配置的):

 @GetMapping("/TestMessageAck2")
  public String TestMessageAck2() {
    String messageId = String.valueOf(UUID.randomUUID());
    String messageData = "message: lonelyDirectExchange test message ";
    String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    Map<String,createTime);
    rabbitTemplate.convertAndSend("lonelyDirectExchange",map);
    return "ok";
  }

呼叫介面,檢視rabbitmq-provuder專案的控制檯輸出情況:

ReturnCallback: 訊息:(Body:'{createTime=2019-09-04 09:48:01,messageId=563077d9-0a77-4c27-8794-ecfb183eac80,messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={},contentType=application/x-java-serialized-object,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,deliveryTag=0])
ReturnCallback: 迴應碼:312
ReturnCallback: 迴應資訊:NO_ROUTE
ReturnCallback: 交換機:lonelyDirectExchange
ReturnCallback: 路由鍵:TestDirectRouting

ConfirmCallback: 相關資料:null
ConfirmCallback: 確認情況:true
ConfirmCallback: 原因:null

可以看到這種情況,兩個函式都被呼叫了;
這種情況下,訊息是推送成功到伺服器了的,所以ConfirmCallback對訊息確認情況是true;
而在RetrunCallback回撥函式的列印引數裡面可以看到,訊息是推送到了交換機成功了,但是在路由分發給佇列的時候,找不到佇列,所以報了錯誤 NO_ROUTE 。
結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回撥函式。

③訊息推送到sever,交換機和佇列啥都沒找到
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回撥是一致的,所以不做結果說明了。
結論: ③這種情況觸發的是 ConfirmCallback 回撥函式。

④訊息推送成功
那麼測試下,按照正常呼叫之前訊息推送的介面就行,就呼叫下 /sendFanoutMessage介面,可以看到控制檯輸出:

ConfirmCallback: 相關資料:null
ConfirmCallback: 確認情況:true
ConfirmCallback: 原因:null

結論: ④這種情況觸發的是 ConfirmCallback 回撥函式。

以上是生產者推送訊息的訊息確認 回撥函式的使用介紹(可以在回撥函式根據需求做對應的擴充套件或者業務資料處理)。

接下來我們繼續, 消費者接收到訊息的訊息確認機制。

和生產者的訊息確認機制不同,因為訊息接收本來就是在監聽訊息,符合條件的訊息就會消費下來。
所以,訊息接收的確認機制主要存在三種模式:

①自動確認, 這也是預設的訊息確認情況。 AcknowledgeMode.NONE
RabbitMQ成功將訊息發出(即將訊息成功寫入TCP Socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞。
所以這種情況如果消費端消費邏輯丟擲異常,也就是消費端沒有處理成功這條訊息,那麼就相當於丟失了訊息。
一般這種情況我們都是使用try catch捕捉異常後,列印日誌用於追蹤資料,這樣找出對應資料再做後續處理。

② 根據情況確認, 這個不做介紹
③ 手動確認 , 這個比較關鍵,也是我們配置接收訊息確認機制時,多數選擇的模式。
消費者收到訊息後,手動呼叫basic.ack/basic.nack/basic.reject後,RabbitMQ收到這些訊息後,才認為本次投遞成功。
basic.ack用於肯定確認
basic.nack用於否定確認(注意:這是AMQP 0-9-1的RabbitMQ擴充套件)
basic.reject用於否定確認,但與basic.nack相比有一個限制:一次只能拒絕單條訊息

消費者端以上的3個方法都表示訊息已經被正確投遞,但是basic.ack表示訊息已經被正確處理。
而basic.nack,basic.reject表示沒有被正確處理:

著重講下reject,因為有時候一些場景是需要重新入列的。

channel.basicReject(deliveryTag,true); 拒絕消費當前訊息,如果第二引數傳入true,就是將資料重新丟回佇列裡,那麼下次還會消費這訊息。設定false,就是告訴伺服器,我已經知道這條訊息資料了,因為一些原因拒絕它,而且伺服器也把這個訊息丟掉就行。 下次不想再消費這條訊息了。

使用拒絕後重新入列這個確認模式要謹慎,因為一般都是出現異常的時候,catch異常再拒絕入列,選擇是否重入列。

但是如果使用不當會導致一些每次都被你重入列的訊息一直消費-入列-消費-入列這樣迴圈,會導致訊息積壓。

順便也簡單講講 nack,這個也是相當於設定不消費某條訊息。

channel.basicNack(deliveryTag,false,true);
第一個引數依然是當前訊息到的資料的唯一id;
第二個引數是指是否針對多條訊息;如果是true,也就是說一次性針對當前通道的訊息的tagID小於當前這條訊息的,都拒絕確認。
第三個引數是指是否重新入列,也就是指不確認的訊息是否重新丟回到佇列裡面去。

同樣使用不確認後重新入列這個確認模式要謹慎,因為這裡也可能因為考慮不周出現訊息一直被重新丟回去的情況,導致積壓。

看了上面這麼多介紹,接下來我們一起配置下,看看一般的訊息接收 手動確認是怎麼樣的。​​​​​​

在消費者專案裡,
新建MessageListenerConfig.java上新增程式碼相關的配置程式碼:

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/4
 * @Description :
 **/
@Configuration
public class MessageListenerConfig {
 
  @Autowired
  private CachingConnectionFactory connectionFactory;
  @Autowired
  private MyAckReceiver myAckReceiver;//訊息接收處理類
 
  @Bean
  public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setConcurrentConsumers(1);
    container.setMaxConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ預設是自動確認,這裡改為手動確認訊息
    //設定一個佇列
    container.setQueueNames("TestDirectQueue");
    //如果同時設定多個如下: 前提是佇列都是必須已經建立存在的
    // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
 
    //另一種設定佇列的方法,如果使用這種情況,那麼要設定多個,就使用addQueues
    //container.setQueues(new Queue("TestDirectQueue",true));
    //container.addQueues(new Queue("TestDirectQueue2",true));
    //container.addQueues(new Queue("TestDirectQueue3",true));
    container.setMessageListener(myAckReceiver);
 
    return container;  }
 
}

對應的手動確認訊息監聽類,MyAckReceiver.java(手動確認模式需要實現 ChannelAwareMessageListener):
//之前的相關監聽器可以先註釋掉,以免造成多個同類型監聽器都監聽同一個佇列。
//這裡的獲取訊息轉換,只作參考,如果報陣列越界可以自己根據格式去調整。

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
 
@Component
 
public class MyAckReceiver implements ChannelAwareMessageListener {
 
  @Override
  public void onMessage(Message message,Channel channel) throws Exception {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
      //因為傳遞訊息的時候用的map傳遞,所以將Map從Message內取出需要做些處理
      String msg = message.toString();
      String[] msgArray = msg.split("'");//可以點進Message裡面看原始碼,單引號直接的資料就是我們的map訊息資料
      Map<String,String> msgMap = mapStringToMap(msgArray[1].trim(),3);
      String messageId=msgMap.get("messageId");
      String messageData=msgMap.get("messageData");
      String createTime=msgMap.get("createTime");
      System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
      System.out.println("消費的主題訊息來自:"+message.getMessageProperties().getConsumerQueue());
      channel.basicAck(deliveryTag,true); //第二個引數,手動確認可以被批處理,當該引數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有訊息
//			channel.basicReject(deliveryTag,true);//第二個引數,true會重新放回佇列,所以需要自己根據業務邏輯判斷什麼時候使用拒絕
    } catch (Exception e) {
      channel.basicReject(deliveryTag,false);
      e.printStackTrace();
    }
  }
 
   //{key=value,key=value,key=value} 格式轉換成map
  private Map<String,String> mapStringToMap(String str,int entryNum ) {
    str = str.substring(1,str.length() - 1);
    String[] strs = str.split(",",entryNum);
    Map<String,String> map = new HashMap<String,String>();
    for (String string : strs) {
      String key = string.split("=")[0].trim();
      String value = string.split("=")[1];
      map.put(key,value);
    }
    return map;
  }
}

這時,先呼叫介面/sendDirectMessage, 給直連交換機TestDirectExchange 的佇列TestDirectQueue 推送一條訊息,可以看到監聽器正常消費了下來:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

到這裡,我們其實已經掌握了怎麼去使用訊息消費的手動確認了。

但是這個場景往往不夠! 因為很多夥伴之前給我評論反應,他們需要這個消費者專案裡面,監聽的好幾個佇列都想變成手動確認模式,而且處理的訊息業務邏輯不一樣。

沒有問題,接下來看程式碼

場景: 除了直連交換機的佇列TestDirectQueue需要變成手動確認以外,我們還需要將一個其他的佇列

或者多個佇列也變成手動確認,而且不同佇列實現不同的業務處理。

那麼我們需要做的第一步,往SimpleMessageListenerContainer裡新增多個佇列:

Springboot 整合RabbitMq(用心看完這一篇就夠了)

然後我們的手動確認訊息監聽類,MyAckReceiver.java 就可以同時將上面設定到的佇列的訊息都消費下來。

但是我們需要做不用的業務邏輯處理,那麼只需要 根據訊息來自的佇列名進行區分處理即可,如:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
 
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
 
  @Override
  public void onMessage(Message message,3);
      String messageId=msgMap.get("messageId");
      String messageData=msgMap.get("messageData");
      String createTime=msgMap.get("createTime");
      
      if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
        System.out.println("消費的訊息來自的佇列名為:"+message.getMessageProperties().getConsumerQueue());
        System.out.println("訊息成功消費到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
        System.out.println("執行TestDirectQueue中的訊息的業務處理流程......");
        
      }
 
      if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
        System.out.println("消費的訊息來自的佇列名為:"+message.getMessageProperties().getConsumerQueue());
        System.out.println("訊息成功消費到 messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);
        System.out.println("執行fanout.A中的訊息的業務處理流程......");
 
      }
      
      channel.basicAck(deliveryTag,true);
//			channel.basicReject(deliveryTag,true);//為true會重新放回佇列
    } catch (Exception e) {
      channel.basicReject(deliveryTag,false);
      e.printStackTrace();
    }
  }
 
  //{key=value,int enNum) {
    str = str.substring(1,enNum);
    Map<String,value);
    }
    return map;
  }
}

ok,這時候我們來分別往不同佇列推送訊息,看看效果:

呼叫介面/sendDirectMessage 和 /sendFanoutMessage ,

Springboot 整合RabbitMq(用心看完這一篇就夠了)

如果你還想新增其他的監聽佇列,也就是按照這種方式新增配置即可(或者完全可以分開多個消費者專案去監聽處理)。

好,這篇Springboot整合rabbitMq教程就暫且到此。

到此這篇關於Springboot 整合RabbitMq(用心看完這一篇就夠了)的文章就介紹到這了,更多相關Springboot 整合RabbitMq內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!