1. 程式人生 > 其它 >RabbitMQ 高階特性——訊息的可靠性投遞

RabbitMQ 高階特性——訊息的可靠性投遞

技術標籤:rabbitmq佇列rabbitmqspring

在使用rabbitmq的時候,作為訊息傳送方希望杜絕任何訊息丟失過著投遞失敗的場景,rabbitMq為我們提供了兩種方式用來控制訊息的投遞可靠性模式:

  • confirm 確認模式
  • return 回退模式

rabbitmq整個訊息投遞的路徑為:生產者(producer)->rabbitmq broker->exchange->queue->consumer

  1. 訊息從producer到exchange則會返回一個confirmCallBack
  2. 訊息從exchange到queue投遞失敗則會返回一個returnCallBack

我們利用spring整合rabbitmq的方式來簡單的實踐下(spring如何整合rabbitmq,請點選這裡 RabbitMQ 入門篇之——Spring整合rabbitmq):

我們先來定義佇列和互動機:

  <!--訊息可靠性投遞(生產端)-->
    <rabbit:queue id="spring_confirm_queue" name="spring_confirm_queue" auto-delete="true"/>
    <rabbit:direct-exchange name="spring_confirm_exchange" id="spring_confirm_exchange" >
        <rabbit:bindings>
            <rabbit:binding queue="spring_confirm_queue" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

生產者程式碼:

1.confirm

package com.cjian.rabbitmq.spring_rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @description:  confirm 確認模式
 * @author: CJ
 * @time: 2021/1/22 16:36
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ConfirmReturnProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
    * 訊息傳送給交換機後執行ConfirmCallBack
    * 確認步驟:
    *       1.開啟確認模式:connectionFactory 開啟 publisher-confirms="true"
    *       2.在rabbitTemplate 中定義ConfirmCallBack回撥函式
    *
    * */
    @Test
    public void testConfirm(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相關配置資訊
             * @param ack  交換機是否成功的收到了訊息  true成功 false失敗
             * @param cause 失敗的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("執行confirm,ack:"+ack);
                if(!ack){
                    System.out.println("成功的接到了訊息"+cause);
                }else{
                    System.out.println("接收訊息失敗"+cause);
                }
            }
        });
        //傳送一條訊息
        rabbitTemplate.convertAndSend("spring_confirm_exchange", "confirm", "hello rabbimq confirm");
}

正常傳送訊息:


模擬下發送異常:

2.return

package com.cjian.rabbitmq.spring_rabbit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @description:  return模式
 * @author: cWX969834
 * @time: 2021/1/22 16:36
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ConfirmReturnProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

       /*
    回退模式:當訊息傳送給交換機後,交換機路由到佇列失敗時,才會執行ReturnCallBack
    步驟:1.開啟回退模式 publisher-returns="true"
        2.設定ReturnCallBack
        3.設定交換機處理訊息的模式:
            ①如果訊息沒有路由到佇列,則丟棄訊息,也是預設的
            ②如果訊息沒有路由到佇列,返回訊息給傳送方

     */
    @Test
    public void testReturn(){
        //設定交換機處理失敗訊息的模式
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message  訊息物件
             * @param replyCode  失敗的錯誤碼
             * @param replyText  錯誤資訊
             * @param exchange   交換機
             * @param routingKey  路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 執行了");
                System.out.println("message:"+message);
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
            }
        });
        rabbitTemplate.convertAndSend("spring_confirm_exchange", "confirm11", "hello rabbimq return");
    }
}

我們直接模擬傳送給佇列失敗的情形:

訊息的可靠性投遞總結:

  1. 設定connection-factory的publisher-confirms="true",開啟確認模式;
  2. 使用rabbitTemplate.setConfirmCallback設定回撥函式,當訊息傳送到交換機後回撥confirm方法。在方法中判斷ack,true:傳送成功,false:傳送失敗;
  3. 設定connection-factory的publisher-return="true",開啟回退模式;
  4. 使用rabbitTemplate.setReturnCallback設定回退函式,當訊息從交換機路由到佇列失敗後,如果設定了
    rabbitTemplate.setMandatory(true)引數,則會將訊息退回給生產者,並執行回撥函式returnedMessage.
  5. 在rabbitmq中也提供了事務機制,但是效能較差,此處就不說了。

使用channel.txSelect()用於將當前channel設定成事務模式;

使用channel.txCommit()用於提交事務;

使用channel.txRollBack()用於回滾事務。