1. 程式人生 > 實用技巧 >RabbitMq訊息可靠性之回退模式 通俗易懂 超詳細 【內含案例】

RabbitMq訊息可靠性之回退模式 通俗易懂 超詳細 【內含案例】

RabbitMq保證訊息可靠性之回退模式

前提

完成 SpringBoot 整合 RabbitMq 中的Topic萬用字元模式

一、更改Producer工程的application.yml檔案

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: username
    password: password
    publisher-returns: true #開啟回退模式
server:
  port: 8080

二、更改ProducerTest.java檔案 ConfirmCallback

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Scope;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;


@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RabbitMqTest {
    
    private RabbitTemplate rabbitTemplate;

    @Resource
    @Scope("prototype")
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        /*
         SpringBoot 預設為true 如果為false下面不執行
         rabbitTemplate.setMandatory(true);
         注意:Spring專案必須書寫!
         */

        /**
         * 路由沒有到達Queue就會執行
         */
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message 訊息物件
             * @param replyCode 訊息編碼
             * @param replyText 時報錯誤資訊
             * @param exchange 交換機
             * @param routingKey 路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.debug("訊息沒有到達Queue,該訊息為:{}",message.getBody());
                log.debug("錯誤編碼:{},錯誤資訊:{}",replyCode,replyText);
                log.debug("交換機:{},路由鍵:{}",exchange,routingKey);
                //這裡routing填寫正確的
                rabbitTemplate.send("topic_exchange","item.aa",message);
            }
        });

        this.rabbitTemplate = rabbitTemplate;
    }

    @Test
    public void test() throws InterruptedException {
        String body = "回退模式傳送訊息";
        //為了達到回退模式 ,routingKey 填寫一個錯誤的 會呼叫 ReturnCallback 傳送一個正確的
        rabbitTemplate.convertAndSend("topic_exchange","dsafasf56.chu",body);
        Thread.sleep(2000);
    }
}

三、測試

首先執行 ProducerTest.java 單元測試,然後在啟動 ConsumerListener.java 訊息監聽器

  1. 如果已經存在 topic_queue 請先刪除後再執行單元測試

四、小結

第一次傳送的訊息不會到達queue,會呼叫到 ReturnCallback 方法,會再次進行傳送.保證訊息的可靠性,不會丟失.