1. 程式人生 > 實用技巧 >03訊息佇列系列-RabbitMQ在Spring boot中的使用

03訊息佇列系列-RabbitMQ在Spring boot中的使用

03訊息佇列系列-RabbitMQ在Spring boot中的使用

參考連結:
Springboot 整合RabbitMq

一、專案簡介

建立兩個專案,一個是rabbitmq-consumer 訊息消費者,另一個是rabbitmq-provider訊息傳送者。基於訊息的傳送接收去了解直連型交換機、主題交換機、扇形交換機等交換機的使用和訊息確認的使用。

二、專案實戰

這裡使用gradle編譯專案

2.1 用到的依賴

implementation 'org.springframework.boot:spring-boot-starter-amqp'

2.2 RabbitMQ在application.properties裡的配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2.3 Direct Exchange直連型交換機的使用

2.3.1 建立rabbitmq-provider

2.3.1.1 佇列、交換機、繫結資訊的配置

建立DirectRabbitConfig類檔案,來配置佇列、交換機、以及繫結資訊

package com.lucky.spring.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/9/16
 */
@Configuration
public class DirectRabbitConfig {

    /**
     * 佇列配置
     * @return
     */
    @Bean
    public Queue directQueue() {
        // durable:是否持久化,預設是false,持久化佇列:會被儲存在磁碟上,當訊息代理重啟時仍然存在,暫存佇列:當前連線有效
        // exclusive:預設也是false,只能被當前建立的連線使用,而且當連線關閉後佇列即被刪除。此參考優先順序高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此佇列,該佇列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般設定一下佇列的持久化就好,其餘兩個就是預設false
        return new Queue("directQueue", true);
    }

    @Bean
    public DirectExchange directExchange() {
        //durable:是否持久化
        //autoDelete:是否自動刪除
        return new DirectExchange("directExchange", true, false);
    }

    @Bean
    public Binding directBinding(){
        return BindingBuilder.bind(directQueue()).to(directExchange())
                .with("directRouting");
    }

}

2.3.1.2 暴露傳送訊息介面

package com.lucky.spring.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * Created by zhangdd on 2020/9/16
 */
@RestController
public class SendDirectMsgController {

    @Autowired
    RabbitTemplate rabbitTemplate;


    @GetMapping("/sendDirectMsg")
    public void sendDirectMsg() {
        String uuid = String.valueOf(UUID.randomUUID());
        String msg = "test message, hello!";
        String createTime = new Date().toString();
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",uuid);
        map.put("messageData",msg);
        map.put("createTime",createTime);

        //將訊息攜帶繫結鍵值:directRouting 傳送到交換機 directExchange
        rabbitTemplate.convertAndSend("directExchange", "directRouting", map);
    }
}

這裡對外提供一個介面,用來觸發傳送訊息。呼叫介面之後,在RabbitMQ管理臺的Exchange面板可以看到配置的交換機directExchange以及在佇列面板可以看到建立的佇列directQueue以及兩者的繫結關係。


可以看到訊息已經到達佇列裡了。

2.3.2 建立rabbitmq-consumer

如果是一個單獨的訊息消費者,那麼只需要建立訊息接收監聽即可。如果既是訊息傳送者同時又是訊息接收者,那就需要建立接收訊息監聽的同時也需要交換機的配置、佇列的配置等資訊。這裡僅以接收訊息說明。

2.3.2.1 建立訊息接收監聽

package com.lucky.spring.receiver.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/19
 */
@Component
@RabbitListener(queues = "directQueue")//配置要監聽的佇列的名字
public class DirectReceiver {
    @RabbitHandler
    public void process(Map msg) {
        System.out.println("DirectReceiver:收到的訊息是:" + msg.toString());
    }
}

2.3.2.2 接收訊息

啟動rabbitmq-consumer專案,可以看到把之前推送的那條訊息消費了下來。可以再次呼叫rabbitmq-provider裡的訊息推送介面,可以看到消費者是及時的接收到了訊息。

2.3.3 多個監聽繫結同一個佇列

既然直連交換機是一對一的,如果有多個監聽繫結到同一個監聽佇列,會怎麼樣。這裡在創建出兩個 訊息接收監聽 DirectReceiverADirectReceiverB

package com.lucky.spring.receiver.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/19
 */
@Component
@RabbitListener(queues = "directQueue")//配置要監聽的佇列的名字
public class DirectReceiverA {
    @RabbitHandler
    public void process(Map msg) {
        System.out.println("DirectReceiverA:收到的訊息是:" + msg.toString());
    }
}

package com.lucky.spring.receiver.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/19
 */
@Component
@RabbitListener(queues = "directQueue")//配置要監聽的佇列的名字
public class DirectReceiverB {
    @RabbitHandler
    public void process(Map msg) {
        System.out.println("DirectReceiverB:收到的訊息是:" + msg.toString());
    }
}

呼叫rabbitmq-provider的訊息推送介面,控制檯日誌資訊如下:

DirectReceiver:收到的訊息是:{createTime=Sun Sep 27 08:29:57 CST 2020, messageId=7ffd1517-b3bc-4e72-8355-6b58de4c3277, messageData=test message, hello!}
DirectReceiverA:收到的訊息是:{createTime=Sun Sep 27 08:29:59 CST 2020, messageId=2f296ca3-4295-418e-9592-d5d3c90d373a, messageData=test message, hello!}
DirectReceiverB:收到的訊息是:{createTime=Sun Sep 27 08:30:00 CST 2020, messageId=759b34c9-98de-4aad-8bc5-fdac2ec5b5ed, messageData=test message, hello!}
DirectReceiver:收到的訊息是:{createTime=Sun Sep 27 08:30:07 CST 2020, messageId=bb276692-cf14-405f-a91a-76df7b6e2fca, messageData=test message, hello!}
DirectReceiverA:收到的訊息是:{createTime=Sun Sep 27 08:30:13 CST 2020, messageId=15381f29-5769-442d-bece-e5acec33f478, messageData=test message, hello!}
DirectReceiverB:收到的訊息是:{createTime=Sun Sep 27 08:30:13 CST 2020, messageId=0dd855f1-f9ef-45f6-9883-c65d6480f0aa, messageData=test message, hello!}
DirectReceiver:收到的訊息是:{createTime=Sun Sep 27 08:30:15 CST 2020, messageId=6f902f7c-ff61-4a3b-a6f6-28e428ca8b43, messageData=test message, hello!}

可以看到是實現了輪詢的方式對訊息進行消費,而且不存在重複消費。

2.4 Topic Exchange 主題交換機的使用

2.4.1 rabbitmq-provider端配置

2.4.1.1 佇列、交換機、繫結資訊的配置

建立 TopicRabbitConfig 類進行交換機、佇列、繫結資訊的配置。

package com.lucky.spring.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/9/19
 */
@Configuration
public class TopicRabbitConfig {
    //繫結鍵
    public final static String MAN = "topic.man";
    public final static String WOMAN = "topic.woman";

    @Bean
    public Queue firstQueue() {
        return new Queue(MAN);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(WOMAN);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    /**
     * 將 firstQueue與topicExchange繫結,而且繫結的鍵值為topic.man
     * 這樣只要是訊息 攜帶的路由鍵是 topic.man 才會分發到該佇列
     * @return
     */
    @Bean
    public Binding bindingExchange1() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);
    }

    /**
     * 將 secondQueue與topicExchange繫結,而且繫結的鍵值為用上通配路由鍵規則topic.#
     * 這樣只要是訊息 攜帶的路由鍵是 topic. 開頭都會分發到該佇列
     * @return
     */
    @Bean
    public Binding bindingExchange2() {
        return BindingBuilder.bind(secondQueue()).
                to(exchange())
                .with("topic.#");
    }
}

2.4.1.2 新增訊息推送介面

package com.lucky.spring.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * Created by zhangdd on 2020/9/20
 */
@RestController
public class SendTopicMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("/sendTopicMessage1")
    public String sendTopicMessage1() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: M A N ";
        String createTime = new Date().toString();
        Map<String, Object> manMap = new HashMap<>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
        return "ok";
    }

    @GetMapping("/sendTopicMessage2")
    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = new Date().toString();
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
        return "ok";
    }
}

2.4.2 rabbitmq-consumer端配置

2.4.2.1 建立訊息接收監聽類

建立訊息接收監聽類,TopicManReceiverTopicTotalReceiver

package com.lucky.spring.receiver.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/20
 */
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {

    @RabbitHandler
    public void process(Map msg) {
        System.out.println("TopicManReceiver消費者收到訊息  : " +
                msg.toString());
    }
}

package com.lucky.spring.receiver.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/20
 */
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("TopicTotalReceiver消費者收到訊息  : " +
                testMessage.toString());
    }
}

2.4.2.2 推送訊息

  • 呼叫/sendTopicMessage1介面:

在rabbitmq-consumer端 控制檯輸出資訊如下:

TopicManReceiver消費者收到訊息  : {createTime=Sun Sep 27 08:41:18 CST 2020, messageId=b949f9fd-0c9e-412d-a55f-9476d2427e3b, messageData=message: M A N }
TopicTotalReceiver消費者收到訊息  : {createTime=Sun Sep 27 08:41:18 CST 2020, messageId=b949f9fd-0c9e-412d-a55f-9476d2427e3b, messageData=message: M A N }

TopicManReceiver監聽佇列1,繫結鍵為:topic.man
TopicTotalReceiver監聽佇列2,繫結鍵為:topic.#


當前推送的訊息,攜帶的路由鍵為:topic.man,所以可以看到兩個監聽消費者receiver都成功消費到了訊息,因為這兩個recevier監聽的佇列的繫結鍵都能與這條訊息攜帶的路由鍵匹配上。

  • 呼叫/sendTopicMessage2介面

在rabbitmq-consumer端 控制檯輸出資訊如下:

TopicTotalReceiver消費者收到訊息  : {createTime=Sun Sep 27 08:44:32 CST 2020, messageId=47c434ee-3b6d-4f52-bb75-2482a1dc5893, messageData=message: woman is all }

TopicManReceiver監聽佇列1,繫結鍵為:topic.man
TopicTotalReceiver監聽佇列2,繫結鍵為:topic.#


當前推送的訊息,攜帶的路由鍵為:topic.woman,所以可以看到兩個監聽消費者只有TopicTotalReceiver成功消費到了訊息,因為這個時候只有TopicTotalReceiver監聽的佇列繫結的鍵才能與當前訊息攜帶的路由鍵匹配上。

2.5 Fanout Exchange 扇形交換機的使用

2.5.1 rabbitmq-provider端的配置

2.5.1.1 交換機、佇列、繫結等資訊的配置

建立FanoutRabbitConfig類對扇形交換機的佇列、交換機、繫結等資訊配置。

package com.lucky.spring.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/9/20
 */
@Configuration
public class FanoutRabbitConfig {
    /**
     *  建立三個佇列 :fanout.A   fanout.B  fanout.C
     *  將三個佇列都繫結在交換機 fanoutExchange 上
     *  因為是扇型交換機, 路由鍵無需配置,配置也不起作用
     */


    @Bean
    public Queue queueA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue queueB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue queueC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA())
                .to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB())
                .to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC())
                .to(fanoutExchange());
    }
}

2.5.1.2 新增訊息推送介面

package com.lucky.spring.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * Created by zhangdd on 2020/9/20
 */
@RestController
public class SendFanoutMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: testFanoutMessage ";
        String createTime = new Date().toString();
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        rabbitTemplate.convertAndSend("fanoutExchange", null, map);
        return "ok";
    }
}

2.5.2 rabbitmq-consumer 端配置

2.5.2.1 建立訊息消費監聽類

這裡建立三個 監聽類,主要是基於扇形交換機無需路由鍵配置。

package com.lucky.spring.receiver.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/20
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutAReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverA消費者收到訊息  : "
                + testMessage.toString());

    }
}

package com.lucky.spring.receiver.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/20
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutBReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverB消費者收到訊息  : " +
                testMessage.toString());
    }
}

package com.lucky.spring.receiver.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Created by zhangdd on 2020/9/20
 */
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutCReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverC消費者收到訊息  : " +
                testMessage.toString());
    }

}

2.5.2.2 推送訊息

  • 呼叫/sendFanoutMessage介面傳送訊息

rabbitmq-consumer端控制檯日誌資訊如下:

FanoutReceiverA消費者收到訊息  : {createTime=Sun Sep 27 12:48:25 CST 2020, messageId=dc5e48bf-d800-4791-8f34-2d6ab8056669, messageData=message: testFanoutMessage }
FanoutReceiverB消費者收到訊息  : {createTime=Sun Sep 27 12:48:25 CST 2020, messageId=dc5e48bf-d800-4791-8f34-2d6ab8056669, messageData=message: testFanoutMessage }
FanoutReceiverC消費者收到訊息  : {createTime=Sun Sep 27 12:48:25 CST 2020, messageId=dc5e48bf-d800-4791-8f34-2d6ab8056669, messageData=message: testFanoutMessage }

可以看到只要傳送到fanoutExchange 這個扇型交換機的訊息, 三個佇列都繫結這個交換機,所以三個訊息接收類都監聽到了這條訊息。

2.6訊息確認

訊息的回撥即訊息確認(生產者推送訊息成功,消費者接收訊息成功)。

2.6.1 生產者推送訊息確認

2.6.1.1 新增application.properties配置

#確認訊息已傳送到交換機(Exchange)
spring.rabbitmq.publisher-confirms=true
#確認訊息已傳送到佇列(Queue)
spring.rabbitmq.publisher-returns=true

2.6.1.2 配置訊息確認回撥函式

package com.lucky.spring.config;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/9/20
 */
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory
                                                       connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        //設定開啟Mandatory,才能觸發回撥函式,無論訊息推送結果怎麼樣都強制呼叫回撥函式
        template.setMandatory(true);
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相關資料:"+correlationData);
                System.out.println("ConfirmCallback:     "+"確認情況:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);

            }
        });
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"訊息:"+message);
                System.out.println("ReturnCallback:     "+"迴應碼:"+replyCode);
                System.out.println("ReturnCallback:     "+"迴應資訊:"+replyText);
                System.out.println("ReturnCallback:     "+"交換機:"+exchange);
                System.out.println("ReturnCallback:     "+"路由鍵:"+routingKey);

            }
        });
        return template;
    }
}

上面就是生產者推送訊息的相關回調函式配置,一個是ConfirmCallback另一個是ReturnCallback


先從總體的情況分析,推送訊息存在四種情況:
①訊息推送到server,但是在server裡找不到交換機
②訊息推送到server,找到交換機了,但是沒找到佇列
③訊息推送到sever,交換機和佇列啥都沒找到
④訊息推送成功


這裡先寫幾個介面來分別測試和認證下以上4種情況,訊息確認觸發回撥函式的情況:
①訊息推送到server,但是在server裡找不到交換機
寫個測試介面,把訊息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有建立沒有配置的):

@GetMapping("/testMessageAck")
    public String TestMessageAck() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: non-existent-exchange test message ";
        String createTime = new Date().toString();
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
        return "ok";
    }

呼叫介面,檢視rabbitmq-provider專案的控制檯輸出情況:

ConfirmCallback:     相關資料:null
ConfirmCallback:     確認情況:false
ConfirmCallback:     原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
2020-09-27 14:02:47.167 ERROR 67934 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

結論: ①這種情況觸發的是ConfirmCallback 回撥函式。

**②訊息推送到server,找到交換機了,但是沒找到佇列**
這種情況就是需要新增一個交換機,但是不給這個交換機繫結佇列,新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何繫結配置操作

package com.lucky.spring.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/9/20
 */
@Configuration
public class MsgCallbackRabbitConfig {

    /**
     * 新增一個交換機,但是不給這個交換機繫結佇列
     * @return
     */
    @Bean
    public DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
}

然後寫個測試介面,把訊息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何佇列配置的):

    @GetMapping("/testMessageAck2")
    public String TestMessageAck2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: lonelyDirectExchange test message ";
        String createTime = new Date().toString();
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
        return "ok";
    }

呼叫介面,檢視rabbitmq-provuder專案的控制檯輸出情況:

ReturnCallback:     訊息:(Body:'{createTime=Sun Sep 27 14:08:43 CST 2020, messageId=9009ed23-0329-41fc-9d2a-77bb7ca67680, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     迴應碼:312
ReturnCallback:     迴應資訊:NO_ROUTE
ReturnCallback:     交換機:lonelyDirectExchange
ReturnCallback:     路由鍵:TestDirectRouting
ConfirmCallback:     相關資料:null
ConfirmCallback:     確認情況:true
ConfirmCallback:     原因:null

可以看到這種情況,兩個函式都被呼叫了;
這種情況下,訊息是推送成功到伺服器了的,所以ConfirmCallback對訊息確認情況是true;
而在RetrunCallback回撥函式的列印引數裡面可以看到,訊息是推送到了交換機成功了,但是在路由分發給佇列的時候,找不到佇列,所以報了錯誤 NO_ROUTE 。
結論:②這種情況觸發的是ConfirmCallback和RetrunCallback兩個回撥函式。

③訊息推送到sever,交換機和佇列啥都沒找到
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回撥是一致的,所以不做結果說明了。
結論: ③這種情況觸發的是ConfirmCallback 回撥函式。

④訊息推送成功
那麼測試下,按照正常呼叫之前訊息推送的介面就行,就呼叫下 /sendFanoutMessage介面,可以看到控制檯輸出:

ConfirmCallback:     相關資料:null
ConfirmCallback:     確認情況:true
ConfirmCallback:     原因:null

結論: ④這種情況觸發的是ConfirmCallback 回撥函式。

2.6.2 消費者接收訊息確認機制

和生產者的訊息確認機制不同,因為訊息接收本來就是在監聽訊息,符合條件的訊息就會消費下來。所以訊息接收的確認機制主要存在三種模式。

  1. 自動確認:這也是預設的訊息確認情況,AcknowledgeMode.NONE
    1. RabbitMQ成功將訊息傳送(即將訊息寫入TCP Socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞。
    2. 所以這種情況如果消費端消費邏輯跑出異常,也就是消費端沒有處理成功這條訊息,那麼就相當於丟失了訊息。一般這種情況下我們都是使用try catch捕獲異常後,列印日誌用於追蹤資料,這樣找出對應資料在做後續處理
  2. 根據情況確認,這個不做介紹
  3. 手動確認,這個比較關鍵,也是我們配置接收訊息確認機制時,多數選擇的模式

消費者收到訊息後,手動呼叫basic.ack/basic.nack/basic.reject後,RabbitMQ收到這些訊息後,才認為本次投遞成功。

  • basic.ack:用於肯定確認
  • basic.nack:用於否定確認
  • basic.reject:用於否定確認,但與basic.nack相比有一個限制:basic.reject一次只能拒絕單條訊息


手動確認模式下,消費者端對於以上三個方法都表示訊息已經被正確投遞,但是basic.ack表示訊息已經被正確處理,而basic.nack和basic.reject表示沒有被正確處理。

2.6.2.1 basic.reject模式

著重講下reject,因為有時候一些場景是需要重新入佇列的。channel.basicReject(deliveryTag,true),拒絕消費當前訊息,如果第二個引數傳入true,就是表示將資料重新丟回佇列裡,那麼下次還會消費這條訊息。設定false,就是告訴伺服器,我已經知道這條訊息資料了,因為一些原因拒絕他,而且服務棄也把它丟掉就行了,下次不想在消費這條訊息了。


使用拒絕後重新入佇列這個模式要謹慎,因為一般都是出現異常的時候,catch異常時選擇是否重入佇列,但是如果使用不當會導致一些被重入佇列的訊息一直消費:異常->重入佇列->接收訊息處理->異常。這樣迴圈會導致訊息積壓。

2.6.2.2 basic.nack模式

對於basic.nack而言,這個也是設定不消費某條訊息.channel.basicNack(deliveryTag,false,true)
第一個引數依然是當前訊息資料的唯一ID
第二個引數是指是否針對多條訊息,如果是true,也就是說一次性針對當前通道的訊息tagID小於當前這小訊息的都拒絕確認。
第三個引數是指是否重新入佇列,也就是指不確認的訊息是否重新丟回到佇列裡面去

這裡同樣要注意訊息迴圈積壓的問題。

2.6.2.3 消費端配置訊息確認

首先建立MyAckReceiver類即手動確認訊息監聽類,需要實現ChannelAwareMessageListener介面:

package com.lucky.spring.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by zhangdd on 2020/9/20
 */
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {


    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //因為傳遞訊息的時候用的map傳遞,所以將Map從Message內取出需要做些處理
            String msg = message.toString();
            String[] msgArray = msg.split("'");//可以點進Message裡面看原始碼,單引號直接的資料就是我們的map訊息資料
            Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(), 3);
            String messageId = msgMap.get("messageId");
            String messageData = msgMap.get("messageData");
            String createTime = msgMap.get("createTime");
            System.out.println("  MyAckReceiver  messageId:" + messageId + "  messageData:" + messageData + "  createTime:" + createTime);
            System.out.println("消費的主題訊息來自:" + message.getMessageProperties().getConsumerQueue());
            channel.basicAck(deliveryTag, true);
//			channel.basicReject(deliveryTag, true);//為true會重新放回佇列
        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
            e.printStackTrace();
        }

    }

    //{key=value,key=value,key=value} 格式轉換成map
    private Map<String, String> mapStringToMap(String str, int entryNum) {
        str = str.substring(1, str.length() - 1);
        String[] strs = str.split(",", entryNum);
        Map<String, String> map = new HashMap<String, String>();
        for (String string : strs) {
            String key = string.split("=")[0].trim();
            String value = string.split("=")[1];
            map.put(key, value);
        }
        return map;
    }

}


接著建立MessageListenerConfig新增相關的配置程式碼:

package com.lucky.spring.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/9/20
 */
@Configuration
public class MessageListenerConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    //訊息接收處理類
    @Autowired
    private MyAckReceiver myAckReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ預設是自動確認,這裡改為手動確認訊息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //設定一個佇列
        container.setQueueNames("directQueue");

        //設定 手動確認訊息監聽類
        container.setMessageListener(myAckReceiver);

        return container;
    }

}


因為上面配置的是directQueue這個佇列作為示例,所以呼叫/sendDirectMsg介面進行檢視結果:

  MyAckReceiver  messageId:b7b95a08-2354-4d90-9f21-376b5bcc1fef  messageData:test message, hello!  createTime:Mon Sep 28 08:41:06 CST 2020
消費的主題訊息來自:directQueue

可以看到監聽器正常的消費的訊息。


到此RabbitMQ在Spring boot中的簡單使用就結束了。