1. 程式人生 > >spring boot2.1整合RabbitMQ

spring boot2.1整合RabbitMQ

轉載請表明出處 https://blog.csdn.net/Amor_Leo/article/details/85085697 謝謝

spring boot2.1整合RabbitMQ

簡單介紹

RabbitMQ 即訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發,削峰等作用.RabbitMQ使用的是AMQP(高階訊息佇列協議).預設啟動埠 5672,UI介面15672.

Direct是RabbitMQ預設的交換機模式,也是最簡單的模式,直連模式.即建立訊息佇列的時候,指定一個RouteKey.當傳送者傳送訊息的時候,指定對應的Key.當Key和訊息佇列的RouteKey一致的時候,訊息將會被髮送到該訊息佇列中.也可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何繫結(binding)操作 。訊息傳遞時,RouteKey必須完全匹配,才會被佇列接收,否則該訊息會被拋棄。

topic轉發資訊主要是依據萬用字元,佇列和交換機的繫結主要是依據一種模式(萬用字元+字串),而當傳送訊息的時候,只有指定的Key和該模式相匹配的時候,訊息才會被髮送到該訊息佇列中.其中* 一個字,# 0個或多個字.

Fanout是路由廣播的模式,將會把訊息發給繫結它的全部佇列,即便設定了key,也會被忽略.

headers也是根據一個規則進行匹配,在訊息佇列和交換機繫結的時候會指定一組鍵值對規則,而傳送訊息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,訊息會被髮送到匹配的訊息佇列中.

安裝RabbitMQ

Erlang語言寫的,安裝Rabbit,必須Erlang語言的執行環境。

Docker能打包安裝環境,使用docker,很容易解決這個問題。

放行虛擬機器埠

firewall-cmd --zone=public --add-port=埠號/tcp --permanent  
firewall-cmd --reload

獲取映象:

docker pull rabbitmq:management

執行例項:

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management

新增依賴

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
   </dependencies>

簡單配置

簡單yml配置

spring:
  rabbitmq:
    host: 192.168.0.114
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置類

/**
 * @author: LHL
 * @ProjectName: rabbitmq
 * @Package: com.amor.config
 * @ClassName: RabbitmqConf
 * @Date: 2018/11/18 21:06
 * @Description:
 * @Version: 1.0
 */
@Configuration
public class RabbitmqConf {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqConf.class);

    /**
     * 訊息交換機的名字
     * */
    private static final String DIRECT_EXCHANGE = "DirectExchange";

    private static final String TOPIC_EXCHANGE = "TopicExchange";

    private static final String FANOUT_EXCHANGE ="FanoutExchange" ;

    private static final String HEADERS_EXCHANGE ="HeadersExchange" ;

    /**
     * 佇列的名字
     * */
    private static final String DIRECT_QUEUE = "DirectQueue";

    private static final String TOPIC_QUEUE = "TopicQueue";

    private static final String FANOUT_QUEUE = "FanoutQueue";

    private static final String HEADERS_QUEUE = "HeadersQueue";

    /**
     * key
     * */
    private static final String DIRECT_KEY = "DirectKey";

    private static final String TOPIC_KEY = "Topic.#";

    /**
     * 1.佇列名字
     * 2.durable="true" 是否持久化 rabbitmq重啟的時候不需要建立新的佇列
     * 3.auto-delete    表示訊息佇列沒有在使用時將被自動刪除 預設是false
     * 4.exclusive      表示該訊息佇列是否只在當前connection生效,預設是false
     */
    @Bean
    public Queue dirctQueue() {
        return new Queue(DIRECT_QUEUE,true,false,false);
    }

    @Bean
    public Queue topicQueue() {
        return new Queue(TOPIC_QUEUE,true,false,false);
    }


    @Bean
    public Queue fanoutQueue() {
        return new Queue(FANOUT_QUEUE,true,false,false);
    }

    @Bean
    public Queue headersQueue() {
        return new Queue(HEADERS_QUEUE,true,false,false);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE,true,false);
    }

    /**
     * 1.交換機名字
     * 2.durable="true" 是否持久化 rabbitmq重啟的時候不需要建立新的交換機
     * 3.autoDelete    當所有消費客戶端連線斷開後,是否自動刪除佇列
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE,true,false);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE,true,false);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE,true,false);
    }

    /**
     * 將direct佇列和交換機進行繫結
     */
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(dirctQueue()).to(directExchange()).with(DIRECT_KEY);
    }

    @Bean
    public Binding bindingTopic() {
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_KEY);
    }


    @Bean
    public Binding bindingFanout() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding headersBinding(){
        Map<String,Object> map = new HashMap<>();
        map.put("headers1","value1");
        map.put("headers2","value2");
        return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
    }

    /**
     * 定義訊息轉換例項  轉化成 JSON 傳輸  傳輸實體就可以不用實現序列化
     * */
    @Bean
    public MessageConverter integrationEventMessageConverter() {
        return  new Jackson2JsonMessageConverter();
    }
}

測試controller

/**
 * @author: LHL
 * @ProjectName: rabbitmq
 * @Package: com.amor.controller
 * @ClassName: RabbitmqController
 * @Date: 2018/11/18 21:33
 * @Description:
 * @Version: 1.0
 */
@RestController
public class RabbitmqController {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqController.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @Description: 傳送訊息
     * 1.交換機
     * 2.key
     * 3.訊息
     * 4.訊息ID
     * rabbitTemplate.send(message);   發訊息;引數物件為org.springframework.amqp.core.Message
     * rabbitTemplate.convertAndSend(message); 轉換併發送訊息;將引數物件轉換為org.springframework.amqp.core.Message後傳送,消費者不能有返回值
     * rabbitTemplate.convertSendAndReceive(message) //轉換併發送訊息,且等待訊息者返回響應訊息.消費者可以有返回值
     * @method: handleMessage
     * @Param: message
     * @return: void
     * @auther: LHL
     * @Date: 2018/11/18 21:40
     */
    @GetMapping("/directSend")
    public void directSend() {
        String message="direct 傳送訊息";
        rabbitTemplate.convertAndSend("DirectExchange","DirectKey",
                message, new CorrelationData(UUID.randomUUID().toString()));
    }

    @GetMapping("/topicSend")
    public void topicSend() {
        String message="topic 傳送訊息";
        rabbitTemplate.convertAndSend("TopicExchange","Topic.Key",
                message, new CorrelationData(UUID.randomUUID().toString()));
    }

    @GetMapping("/fanoutSend")
    public void fanoutSend() {
        String message="fanout 傳送訊息";
        rabbitTemplate.convertAndSend("FanoutExchange","",message, new CorrelationData(UUID.randomUUID().toString()));
    }

    @GetMapping("/headersSend")
    public void headersSend(){
        String msg="headers 傳送訊息";
        MessageProperties properties = new MessageProperties();
        properties.setHeader("headers1","value1");
        properties.setHeader("headers2","value2");
        Message message = new Message(msg.getBytes(),properties);
        rabbitTemplate.convertAndSend("HeadersExchange","",message, new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     * @Description: 消費訊息
     * @method: handleMessage
     * @Param: message
     * @return: void
     * @auther: LHL
     * @Date: 2018/11/18 21:41
     */
    @RabbitListener(queues = "DirectQueue")
    @RabbitHandler
    public void directMessage(String message){
        LOGGER.info("DirectConsumer {} directMessage :"+message);
    }

    @RabbitListener(queues = "TopicQueue")
    @RabbitHandler
    public void topicMessage(String message){
        LOGGER.info("TopicConsumer {} topicMessage :"+message);
    }

    @RabbitListener(queues = "FanoutQueue")
    @RabbitHandler
    public void fanoutMessage(String message){
        LOGGER.info("FanoutConsumer {} fanoutMessage :"+message);
    }

    @RabbitListener(queues = "HeadersQueue")
    @RabbitHandler
    public void headersMessage(Message message){ 
    	LOGGER.info("HeadersConsumer {} headersMessage :"+message);
    }
}

開啟訊息傳送交換機確認和佇列與交換器是否繫結確認

yml配置

spring:
  rabbitmq:
    host: 192.168.0.114
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 開啟發送確認
    publisher-confirms: true
    # 開啟發送失敗退回
    publisher-returns: true
    listener:
      direct:
        retry:
          enabled: true #消費者端的重試
      simple:
        retry:
          enabled: true #消費者端的重試
    template:
      reply-timeout: 10000 #超時時間
      retry:
        enabled: true  #設定為true的時候RabbitTemplate(生產端)能夠實現重試

配置類

/**
 * @author: LHL
 * @ProjectName: rabbitmq
 * @Package: com.amor.config
 * @ClassName: RabbitSender
 * @Date: 2018/11/18 23:24
 * @Description:
 * @Version: 1.0
 */
@Service
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 實現訊息傳送到RabbitMQ交換器後接收ack回撥,如果訊息傳送確認失敗就進行重試.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("訊息傳送成功,訊息ID:{}", correlationData.getId());
        } else {
            logger.info("訊息傳送失敗,訊息ID:{}", correlationData.getId());
        }
    }

    /**
     * 實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("訊息傳送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},訊息體:{}",replyCode, replyText, exchange, routingKey, new String(message.getBody()));
    }

   /**
     * convertAndSend 非同步,訊息是否傳送成功用ConfirmCallback和ReturnCallback回撥函式類確認。
     * 傳送MQ訊息
     */
    public void sendMessage(String exchangeName, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     * sendMessageAndReturn 當傳送訊息過後,該方法會一直阻塞在哪裡等待返回結果,直到請求超時,配置spring.rabbitmq.template.reply-timeout來配置超時時間。
     * 傳送MQ訊息並返回結果
     */
    public Object sendMessageAndReturn(String exchangeName, String routingKey, Object message) {
        return rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
    }
}
/**
 * @author: LHL
 * @ProjectName: rabbitmq
 * @Package: com.amor.controller
 * @ClassName: RabbitmqController
 * @Date: 2018/11/18 21:33
 * @Description:
 * @Version: 1.0
 */
@RestController
public class RabbitmqController {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqController.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitSender rabbitSender;

    /**
     * @Description: 傳送訊息
     * 1.交換機
     * 2.key
     * 3.訊息
     * 4訊息ID
     * rabbitTemplate.send(message);   發訊息;引數物件為org.springframework.amqp.core.Message
     * rabbitTemplate.convertAndSend(message); 轉換併發送訊息;將引數物件轉換為org.springframework.amqp.core.Message後傳送,消費者不能有返回值
     * rabbitTemplate.convertSendAndReceive(message) //轉換併發送訊息,且等待訊息者返回響應訊息.消費者可以有返回值
     * @method: handleMessage
     * @Param: message
     * @return: void
     * @auther: LHL
     * @Date: 2018/11/18 21:40
     */
    @GetMapping("/directSend")
    public void directSend() {
        String message = "direct 傳送訊息";
        //rabbitTemplate.convertAndSend("DirectExchange", "DirectKey", message, new CorrelationData(UUID.randomUUID().toString()));
        rabbitSender.sendMessage("DirectExchange", "DirectKey", message);

    }

    @GetMapping("/topicSend")
    public void topicSend() {
        String message = "topic 傳送訊息";
        //rabbitTemplate.convertAndSend("TopicExchange", "Topic.Key", message, new CorrelationData(UUID.randomUUID().toString()));
        rabbitSender.sendMessage("TopicExchange", "Topic.Key", message);
    }

    @GetMapping("/fanoutSend")
    public void fanoutSend() {
        String message = "fanout 傳送訊息";
        //rabbitTemplate.convertAndSend("FanoutExchange", "", message, new CorrelationData(UUID.randomUUID().toString()));
        rabbitSender.sendMessage("FanoutExchange", "", message);
    }

    @GetMapping("/headersSend")
    public void headersSend() {
        String msg = "headers 傳送訊息";
        MessageProperties properties = new MessageProperties();
        properties.setHeader("headers1", "value1");
        properties.setHeader("headers2", "value2");
        Message message = new Message(msg.getBytes(), properties);
        //rabbitTemplate.convertAndSend("HeadersExchange","",message, new CorrelationData(UUID.randomUUID().toString()));
        rabbitSender.sendMessage("HeadersExchange", "", message);
    }

    /**
     * @Description: 消費訊息
     * @method: handleMessage
     * @Param: message
     * @return: void
     * @auther: LHL
     * @Date: 2018/11/18 21:41
     */
    @RabbitListener(queues = "DirectQueue")
    @RabbitHandler
    public void directMessage(String sendMessage) {
           LOGGER.info("DirectConsumer {} directMessage :" + message);
    }

    @RabbitListener(queues = "TopicQueue")
    @RabbitHandler
    public void topicMessage(String message) {
        LOGGER.info("TopicConsumer {} topicMessage :" + message);
    }

    @RabbitListener(queues = "FanoutQueue")
    @RabbitHandler
    public void fanoutMessage(String message) {
        LOGGER.info("FanoutConsumer {} fanoutMessage :" + message);
    }

    @RabbitListener(queues = "HeadersQueue")
    @RabbitHandler
    public void headersMessage(Message message) {
        LOGGER.info("HeadersConsumer {} headersMessage :" + message);
    }
}

開啟ACK手動確認模式

手動確認配置

第一種直接yml配置

spring:
  rabbitmq:
    host: 192.168.0.114
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 開啟發送確認
    publisher-confirms: true
    # 開啟發送失敗退回
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual      # 開啟ACK
        retry:
          enabled: true #消費者端的重試
      simple:
        retry:
          enabled: true #消費者端的重試
        acknowledge-mode: manual         # 開啟ACK
        concurrency: 10  #消費者的最小數量
        max-concurrency: 20  #消費者的最大數量
        prefetch: 5  #在單個請求中處理的訊息個數,他應該大於等於事務數量
        auto-startup: true  #啟動時自動啟動容器	true
        default-requeue-rejected: true  #投遞失敗時是否重新排隊
    template:
      reply-timeout
            
           

相關推薦

spring boot2.1整合RabbitMQ

轉載請表明出處 https://blog.csdn.net/Amor_Leo/article/details/85085697 謝謝 spring boot2.1整合RabbitMQ 簡單介紹 安裝RabbitMQ 新增依賴 簡單配置

spring boot2.1.1.RELEASE整合Activiti 6.0.0 實戰程式設計

下載地址:https://download.csdn.net/download/yebichao/10869999內容包括:spring boot2.1.1.RELEASE整合Activiti 6實戰程式設計:包含資料庫、源程式、activiti參考書,Readme.在開發環境中匯入專案程式及資料庫即可執行。

Spring Boot2.X整合訊息中介軟體RabbitMQ原理簡淺探析

目錄 1、簡單概述RabbitMQ重要作用 2、簡單概述RabbitMQ重要概念 3、Spring Boot整合RabbitMQ 前言 RabbitMQ是一個訊息佇列,主要是用來實現應用程式的非

新手入門教程-------Spring Boot中整合RabbitMQ

AMQP:是Advanced Message Queuing Protocol的簡稱,高階訊息佇列協議,是一個面向訊息中介軟體的開放式標準應用層協議。   定義了以下特性: 訊息方向 訊息佇列 訊息路由(包括:點到點和釋出-訂閱模式) 可靠性 安全

spring boot2.1讀取 apollo 配置中心3

上篇記錄了springboot讀取apollo的配置資訊,以及如何獲取服務端的推送更新配置。 接下來記錄一下,如何獲取公共namespace的配置。 上文中使用如下程式碼共聚公共名稱空間的配置: @ApolloConfig("TEST1.MiddleWare") private

Spring Boot】--整合RabbitMQ

  目錄 0、前言 1、訊息流程 2、新增依賴 3、新增配置 4、新建配置類 5、新建生產者介面 6、新建生產者實現類 7、新建生產者控制器類 8、測試 0、前言 需要已經安裝RabbitMQ,並且啟動、配置好使用者。參考《基於Cen

Spring Boot2.x 整合lettuce redis 和 redisson

前言 springboot2之前redis的連線池為jedis,2.0以後redis的連線池改為了lettuce,lettuce能夠支援redis4,需要java8及以上。 lettuce是基於netty實現的與redis進行同步和非同步的通訊。 lettuce和jedi

Spring Boot2.x 整合quartz叢集

springboot2.x支援對quartz的自動配置,引入jar <!-- spring boot2.x + quartz --> <dependency> &

Spring Boot2 + Mybatis 整合(Mybatis自動生成外掛、分頁外掛)

內容: Spring Boot2 + Mybatis 整合 Mybatis Generator自動生成程式碼 Mybatis PageHelper分頁外掛

Spring Cloud Bus整合RabbitMQ ( 17 )

轉自 https://blog.csdn.net/u012702547/article/details/77823434 這個系列我感覺真的太好了,可以一步一步的瞭解spring cloud 的搭建以及更深層次的東西,對想學這門技術的朋友真的入門特別的快,感謝這位大哥的分享,我也會持續

Spring Boot2.0整合Mybatis(自動生成註解方式,多環境配置)

本介紹Spring Boot2.0整合Mybatis,通過MyBatis Generator外掛自動生成mapper的sql註解及Provider類的過程,支援多環境的yml配置檔案首先用IDE開發工具(IDEA,STS,Eclipse)建立一個Spring Boot工程sp

手把手教你Spring Boot2.x整合kafka

#首先得自己搭建一個kafka,搭建教程請自行百度,本人是使用docker搭建了一個單機版的zookeeper+kafka作為演示,文末會有完整程式碼包提供給大家下載參考 ![](https://img2020.cnblogs.com/blog/1543487/202103/1543487-202103021

手把手教你Spring Boot2.x整合Elasticsearch(ES)

#文末會附上完整的程式碼包供大家下載參考,碼字不易,如果對你有幫助請給個點贊和關注,謝謝! #如果只是想看java對於Elasticsearch的操作可以直接看第四大點 ##一、docker部署Elasticsearch(下面簡稱es)單機版教程 ###1、部署es * 拉取es映象(這裡我使用的版本

spring boot 1.5.4 整合rabbitMQ(十七)

rabbitmq springboot springboot1.5.4 springboot整合jsp springboot整合rabbitmq 上一篇:spring boot 1.5.4 整合redis、攔截器、過濾器、監聽器、靜態資源配置(十六) 關於rabbitMQ原理,請參閱博客:

Spring Boot2整合Shiro(1):身份認證

Spring Boot2整合Shiro(1):身份認證   前言 本文主要介紹了在Spring Boot2專案中整合Shiro實現登入認證。本文假設讀者已經對Shiro和基於RBAC的許可權控制系統有了基本的認識。  本專案沒有資料庫,也就沒有dao層,所有的使用者和

Spring Boot2.0.5整合ElasticSearch6.4.1 叢集

轉載請標明出處 https://blog.csdn.net/Amor_Leo/article/details/83012038 謝謝 ES叢集搭建 https://blog.csdn.net/Amor_Leo/article/details/83011372 兩個節點 ES叢集搭建

RabbitMQ (九) Spring整合RabbitMQ(1)

前面幾篇講解了如何使用rabbitMq,這一篇主要講解spring整合rabbitmq。    首先引入配置檔案org.springframework.amqp,如下 <dependency> <groupId>com.rabbitmq&

spring mvc +Mybatis3.1 整合的時候異常

factory lec sta error .get for 1.0 character mod 在使用Mybatis3.10+spring3.10+mybatis-spring-1.0.0集成,使用spring 時發生例如以下錯誤: 嚴重: Servlet

spring boot實戰(第十二篇)整合RabbitMQ

this direct 還需要 添加屬性 創建 還需 topic start routing 前言 本篇主要講述Spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar [html] view plain cop

spring boot2 整合(一)Mybatis (特別完整!)

one OS solver mapper ant gin enabled selectall pom 大概介紹下流程: 借助idea實現mybatis逆向工程 用xml配置實現整合 用cmd命令行實現mybatis逆向工程 用mapping.xml配置實現數據交互