1. 程式人生 > >六、釋出訊息到RabbitMQ與效能權衡(讀書筆記與個人實踐)

六、釋出訊息到RabbitMQ與效能權衡(讀書筆記與個人實踐)

今年企業對Java開發的市場需求,你看懂了嗎? >>>   

摘要

本文主要學習的目標有兩個:

  • RabbitMQ中的訊息可靠性投遞的方式;
  • 釋出的效能權衡;

雖然不是所有的系統都要求像銀行一樣對訊息可靠投遞有非常嚴格的要求,但確保訊息被接收和投遞是非常重要的。RabbitMQ基於AMQP規範,後者提供訊息釋出中的事務以及訊息持久化選項,以提供比自身普通訊息釋出更高階的可靠訊息通訊機制。

釋出效能的權衡

在RabbitMQ中,建立可靠性投遞的每個機制都會對效能產生一定的影響。單獨使用時可能不太會注意到吞吐量的差異,但是當它們組合使用時,吞吐量就會由明顯不同,只有通過執行自己的效能基準測試,才能確定性能與可靠性投遞之間可以接受的平衡。

下面從左到右依次說明這些機制會產生哪些效能影響。

另外,會使用Spring提供的RabbitTemplate客戶端工具(使用過RabbitTemplate,後續可能不會介紹RabbitTemplate),對每種機制進行配置,併發送訊息到RabbitMQ。

程式碼在Github:https://github.com/XuePeng87/rabbitmq-example

沒有保障

在完美世界裡,無須任何額外的配置或操作,RabbitMQ就可以可靠的投遞訊息。

不幸的是,當墨菲定律肆虐我們的程式時,完美世界並不存在。

在非核心應用中,釋出的訊息不必處理每個可能的故障點,例如發一些允許丟棄的訊息,那麼我們可以不使用任何保障機制,直接使用Basic.Publish傳送訊息。

使用RabbitTemplate時,可以在配置檔案中設定:

spring:
  #訊息佇列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-returns: false
    publisher-confirms: false
    connection-timeout: 5000ms

將publisher-returns和publisher-confirms設定為false。

失敗通知

設定mandatory後,RabbitMQ將不接受不可路由的訊息。

mandatory標誌是一個與Basic.Publish命令一起傳遞的引數,該引數會告訴RabbitMQ,如果訊息不可路由,它應該通過Basic.Return命令將訊息返回給釋出者。設定mandatory標誌可以被認為是開啟故障檢測模式,它只會讓RabbitMQ向你通知失敗,而不會通知成功。如果訊息路由正確,你的釋出者將不會收到通知。

    /**
     * 定製AmqpTemplate物件。
     * 可根據需要定製多個。
     *
     * @return AmqpTemplate物件。
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        rabbitTemplate.setEncoding("UTF-8");
        // 設定不接受不可路由的訊息,需要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 訊息 {} 傳送失敗,應答碼:{},原因:{},交換器: {},路由鍵:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        return rabbitTemplate;
    }

如上面的配置,我們設定了mandatory等於true,同時將配置檔案中的publisher-returns也設定為true,這樣就打開了失敗通知。下面做個測試:

    /**
     * 傳送direct訊息。
     * 交換器存在,但佇列不存在,為了測試Mandatory與ReturnCallback。
     *
     * @param message 訊息內容。
     */
    public void directNotExistQueue(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

我們建立了交換器DIRECT_EXCHANGE,但是使用一個不存在的RoutingKey,這就等於傳送訊息到交換器成功,但是無法路由到某一個佇列,執行測試用例,觀察結果:

    /**
     * 傳送direct訊息,但訊息路由不存在。
     * 交換器存在,但佇列不存在,為了測試Mandatory與ReturnCallback。
     */
    @Test
    public void testDirectNotExistQueue() {
        messageProducer.directNotExistQueue("{}");
    }

結果如下: 

ReturnCallback -> 訊息 null 傳送失敗,應答碼:312,原因:NO_ROUTE,交換器: DIRECT_EXCHANGE,路由鍵:DIRECT_ROUTING_KEY_NOT_EXIST

Basic.Return呼叫是一個RabbitMQ的非同步呼叫,並且在訊息釋出後的任何時候都可能發生。

如果程式碼中沒有設定setReturnCallback,那麼該呼叫將被忽略。

其實setReturnCallback就是處理Basic.Return的回撥方法,RabbitTemplate接收到Basic.Return命令後,呼叫該方法。

釋出者確認

釋出者確認模式是AMQP規範的擴充套件功能,只能用在支援這個特定擴充套件的客戶端,RabbitTemplate支援這個模式。

在協議層,釋出任何訊息之前,訊息釋出者必須向RabbitMQ傳送Confirm.Select請求,並等待Confirm.SelectOk響應以獲知投遞確認已經被啟動。在這一點上,對於釋出者傳送給RabbitMQ的每條訊息,伺服器會發送一個確認響應(Basic.Ack)或否定確認響應(Basic.Nack)。

在RabbitTemplate中,要使用釋出者確認,需要在配置檔案中配置:

publisher-confirms: true

然後在設定回撥函式:

    /**
     * 定製AmqpTemplate物件。
     * 可根據需要定製多個。
     *
     * @return AmqpTemplate物件。
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        // 設定訊息轉換器為Jackson
        rabbitTemplate.setEncoding("UTF-8");
        // 設定不接受不可路由的訊息,需要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 訊息 {} 傳送失敗,應答碼:{},原因:{},交換器: {},路由鍵:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        // 設定訊息釋出確認功能,需要在yml中配置:publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("ConfirmCallback -> 訊息釋出到交換器成功,id:{}", correlationData);
            } else {
                log.warn("ConfirmCallback -> 訊息釋出到交換器失敗,錯誤原因為:{}", cause);
            }
        });
        // 開啟事務模式,需要在yml中配置:publisher-confirms: false
        // rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

呼叫setConfirmCallback方法,設定回撥函式,每次傳送訊息到RabbitMQ,伺服器都會返回響應,可以通過判斷ack來確定是否傳送成功。

當成功傳送到交換器後,ConfirmCallback會接收到ack為true的響應,如果沒有成功傳送到交換器,則會接收到ack為false的響應。

具體測試程式碼如下:

    /**
     * 傳送direct訊息。
     * 交換器不存在,佇列也不存在,為了測試ConfirmCallback。
     *
     * @param message 訊息內容。
     */
    public void directNotExistExchangeAndQueue(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE_NOT_EXIST", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

首先向不存在的交換器傳送訊息,結果為:

    /**
     * 傳送direct訊息,交換器和路由都不存在。
     * 交換器不存在,佇列也不存在,為了測試ConfirmCallback。
     */
    @Test
    public void testDirectNotExistExchangeAndQueue() {
        messageProducer.directNotExistExchangeAndQueue("{}");
    }
ConfirmCallback -> 訊息釋出到交換器失敗,錯誤原因為:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE_NOT_EXIST' in vhost '/', class-id=60, method-id=40)

然後在使用失敗通知模式的測試用例測試一下,即能傳送到交換器,但是無法路由到佇列:

ReturnCallback -> 訊息 null 傳送失敗,應答碼:312,原因:NO_ROUTE,交換器: DIRECT_EXCHANGE,路由鍵:DIRECT_ROUTING_KEY_NOT_EXIST

ConfirmCallback -> 訊息釋出到交換器成功,id:CorrelationData [id=9282dbe9-4fe9-4b85-af06-79305f4c99e1]
無論是否使用釋出者確認模式,如果你釋出訊息到不存在的交換器,那麼釋出用的通道將會被RabbitMQ關閉。

釋出者確認模式不能與事務模式一起工作,此外,作為對Basic.Publish請求的非同步響應,它並不能保證何時會收到確認。

備用交換器

備用交換器是RabbitMQ對AMQP的另一種擴充套件,用於處理無法路由的訊息。備用交換器在第一次宣告交換器時被指定,用來提供一種預先存在的交換器,即如果交換器無法路由訊息,那麼訊息就會被路由到這個新的備用交換器。

如果將訊息傳送到具有備用交換器的交換器(設定了mandatory=true)上, 那麼一旦預期的交換器無法正常路由訊息,Basic.Return就不會發給釋出者。因為訊息成功的釋出到了備用交換器。

RabbitTemplate宣告備用交換器的程式碼如下:

    /**
     * 宣告Direct交換器。
     * 同時指定備用交換器。
     *
     * @return Exchange物件。
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE")
                .durable(false)
                .withArgument("alternate-exchange", "UN_ROUTE_EXCHANGE")
                .build();
    }

在宣告交換器時,呼叫withArgument函式,key為alternate-exchange,value為備用交換器的名稱,這裡是UN_ROUTE_EXCHANGE(備用伺服器也需要建立)。

下面進行測試,傳送一個無法路由的訊息到DIRECT_EXCHANGE,這個訊息將不能被路由,但不會回撥ReturnCallback,而是會進入到UN_ROUTE_EXCHANGE交換器中:

事務提交

AMQP事務提供了一種機制,通過這種機制,訊息可以批量釋出到RabbitMQ,然後提交到佇列或者回滾。

在RabbitTemplate中,使用事務就不能使用ReturnConfime模式,所以要把publisher-confimes設定為false,具體程式碼如下:

    /**
     * 定製AmqpTemplate物件。
     * 可根據需要定製多個。
     *
     * @return AmqpTemplate物件。
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        // 設定訊息轉換器為Jackson
        rabbitTemplate.setEncoding("UTF-8");
        // 設定不接受不可路由的訊息,需要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 訊息 {} 傳送失敗,應答碼:{},原因:{},交換器: {},路由鍵:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        // 開啟事務模式,需要在yml中配置:publisher-confirms: false
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

程式碼中,要設定setChannelTransacted為true,然後宣告RabbitMQ的事務管理器:

    /**
     * 宣告RabbitMQ事務管理器。
     *
     * @param connectionFactory 連線工廠。
     * @return PlatformTransactionManager物件。
     */
    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

到這裡,事務的配置準備工作就做好了,接下來,基於事務模式傳送訊息:

    /**
     * 在事務模式下,傳送direct訊息。
     * <p>
     * 第一次傳送,訊息可以正常路由到佇列。
     * 第二次傳送,訊息不能路由到佇列。
     */
    @Transactional(rollbackFor = Exception.class)
    public void directOnTransaction(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

程式碼中,加入了@Transactional修飾方法,先後傳送兩條訊息到交換器,第一次傳送的訊息會正常路由到佇列,第二次傳送的訊息則不會發送到佇列,下面是測試程式碼和結果:

    /**
     * 在事務模式下,傳送direct訊息。
     * 第一次傳送,訊息可以正常路由到佇列。
     * 第二次傳送,訊息不能路由到佇列。
     */
    @Test
    public void testDirectOnTransaction() {
        messageProducer.directOnTransaction("{}");
    }
org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction

由於發生了異常,執行了回滾,所以第一條訊息也沒有被髮送到佇列:

如果兩條資料都會成功傳送到RabbitMQ,則會成功提交兩條訊息。

如果不用@Transactional修飾方法,那麼會有一條訊息進入RabbitMQ,另一條訊息丟失,具體測試如下,首先是兩條訊息都能傳送到RabbitMQ:

    /**
     * 在事務模式下,傳送direct訊息。
     * <p>
     * 第一次傳送,訊息可以正常路由到佇列。
     * 第二次傳送,訊息不能路由到佇列。
     */
    @Transactional(rollbackFor = Exception.class)
    public void directOnTransaction(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
    }

下面把@Transactional修飾去掉,然後一條可以傳送到RabbitMQ,另一條不可以:

    /**
     * 在事務模式下,傳送direct訊息。
     * <p>
     * 第一次傳送,訊息可以正常路由到佇列。
     * 第二次傳送,訊息不能路由到佇列。
     */
    // @Transactional(rollbackFor = Exception.class)
    public void directOnTransaction(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

執行後結果如下:

org.springframework.amqp.AmqpIOException: java.io.IOException

可以看到程式依舊丟擲了異常,但第一條訊息傳送到了RabbitMQ中:

在協議層,當RabbitMQ由於錯誤而無法路由時,它將傳送一個Basic.Return響應,希望終止事務的釋出者應該傳送TX.Rollback請求,並等待TX.RollbackOk響應,然後繼續工作。

RabbitMQ只在每個發出的命令作用於單個佇列時才執行原子事務。如果不只一個佇列受到事務中任何命令的影響,則提交就不具備原子性。

推薦使用釋出確認模式用作輕量級替代方案,因為它的速度快,可以同時提供肯定或否定的確認。

高可用佇列以及高可用佇列事務

高可用佇列(HA佇列)時RabbitMQ的一項增強功能,它允許佇列在多個伺服器上擁有冗餘副本。

當訊息傳送到高可用佇列是,訊息會發送到叢集中的每臺伺服器,一旦訊息在叢集中的任何節點都完成消費,那麼訊息所有副本將立即從其他節點刪除。

HA佇列中有一個節點是主節點,其他所有節點都是輔助節點。當主節點發生故障,會在輔助節點中選擇一個接管主節點的角色。如果HA節點中的一個輔助節點故障了,其他節點將照常工作。

當一個故障節點恢復了,或者新新增進來一個輔助節點,它將不包含任何已經存在於現有節點中的訊息,當現有節點的訊息被消費後,故障節點或新節點則開始接收訊息,並執行同步操作。

如果使用事務或訊息確認機制,則訊息需要在HA佇列中所有活動節點確定後,RabbitMQ才會傳送成功響應。

高可用佇列的配置在後面會單獨寫一篇。

訊息持久化

如果將一個訊息的delivery-mode設定為1,RabbitMQ會被告知不需要將訊息儲存到磁碟,而訊息會一直儲存在記憶體中。

為了使訊息在RabbitMQ重啟後仍然存在,除了將delivery-mode設定為2,還需要在建立佇列時設定durable,使佇列變為持久化佇列。

在釋出訊息時,RabbitTemplate預設採用持久化策略,如果希望持久化儲存訊息,需要在傳送訊息時做如下設定:

    /**
     * 傳送direct非持久化訊息。
     * RabbitTemplate預設採用訊息持久化儲存。
     *
     * @param message 訊息內容。
     */
    public void directNonPersistent(String message) {
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE",
                "DIRECT_ROUTING_KEY",
                message, msg -> {
                    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    msg.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                    return msg;
                }
        );
    }

setDeliveryMode為非持久化模式後,傳送的訊息將只儲存在RabbitMQ的記憶體中。

在I/O密集型伺服器中,通過作業系統在儲存裝置之間傳輸資料時,作業系統將阻塞I/O操作的程序。當RabbitMQ伺服器正在嘗試執行I/O操作,並等待儲存裝置響應時,作業系統核心發生阻塞,那麼RabbitMQ能做的就只有等待。

儘管訊息持久化時保障訊息最終被投遞的最重要的方式之一,但實現它的代