1. 程式人生 > >rabbitmq可靠傳送的自動重試機制

rabbitmq可靠傳送的自動重試機制

接這篇

在上文中,主要實現了可靠模式的consumer。而可靠模式的sender實現的相對簡略,主要通過rabbitTemplate來完成。
本以為這樣的實現基本是沒有問題的。但是前段時間做了一個性能壓力測試,但是發現在使用rabbitTemplate時,會有一定的丟資料問題。

當時的場景是用30個執行緒,無間隔的向rabbitmq傳送資料,但是當執行一段時間後發現,會出現一些connection closed錯誤,rabbitTemplate雖然進行了自動重連,但是在重連的過程中,丟失了一部分資料。當時傳送了300萬條資料,丟失在2000條左右。
這種丟失率,對於一些對一致性要求很高的應用(比如扣款,轉賬)來說,是不可接受的。

在google了很久之後,在stackoverflow上找到rabbitTemplate作者對於這種問題的解決方案,他給的方案很簡單,單純的增加connection數:

connectionFactory.setChannelCacheSize(100);

修改之後,確實不再出現connection closed這種錯誤了,在傳送了3000萬條資料後,一條都沒有丟失。
似乎問題已經完美的解決了,但是我又想到一個問題:當我們的網路在發生抖動時,這種方式還是不是安全的?
換句話說,如果我強制切斷客戶端和rabbitmq服務端的連線,資料還會丟失嗎?

為了驗證這種場景,我重新發送300萬條資料,在傳送過程中,在rabbitmq的管理介面上點選強制關閉連線:


然後發現,仍然存在丟失資料的問題。

看來這個問題,沒有想象中的那麼簡單了。

在閱讀了部分rabbitTemplate的程式碼之後發現:
1 rabbitTemplate的ack確認機制是非同步的
2 這種確認機制是一種事後發現機制,並不能同步的發現問題
也就是說,即便打開了

connectionFactory.setPublisherConfirms(true);
rabbitTemplate.setMandatory(true);

並且實現了:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if
(!ack) { log.info("send message failed: " + cause + correlationData.toString()); } })
;

依舊是不安全的。
rabbitTemplate的傳送流程是這樣的:
1 傳送資料並返回(不確認rabbitmq伺服器已成功接收)
2 非同步的接收從rabbitmq返回的ack確認資訊
3 收到ack後呼叫confirmCallback函式
注意:在confirmCallback中是沒有原message的,所以無法在這個函式中呼叫重發,confirmCallback只有一個通知的作用

在這種情況下,如果在2,3步中任何時候切斷連線,我們都無法確認資料是否真的已經成功傳送出去,從而造成資料丟失的問題。

最完美的解決方案只有1種:
使用rabbitmq的事務機制。
但是在這種情況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。
第二種解決方式,使用同步的傳送機制,也就是說,客戶端傳送資料,rabbitmq收到後返回ack,再收到ack後,send函式才返回。程式碼類似這樣:

建立channel
send message
wait for ack(or 超時)
close channel
返回成功or失敗

同樣的,由於每次傳送message都要重新建立連線,效率很低。

基於上面的分析,我們使用一種新的方式來做到資料的不丟失。
在rabbitTemplate非同步確認的基礎上
1 在本地快取已傳送的message
2 通過confirmCallback或者被確認的ack,將被確認的message從本地刪除
3 定時掃描本地的message,如果大於一定時間未被確認,則重發

當然了,這種解決方式也有一定的問題
想象這種場景,rabbitmq接收到了訊息,在傳送ack確認時,網路斷了,造成客戶端沒有收到ack,重發訊息。(相比於丟失訊息,重發訊息要好解決的多,我們可以在consumer端做到冪等)。
自動重試的程式碼如下:

public class RetryCache {
    private MessageSender sender;
    private boolean stop = false;
    private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();
    private AtomicLong id = new AtomicLong();

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    private static class MessageWithTime {
        long time;
        Object message;
    }

    public void setSender(MessageSender sender) {
        this.sender = sender;
        startRetry();
    }

    public String generateId() {
        return "" + id.incrementAndGet();
    }

    public void add(String id, Object message) {
        map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
    }

    public void del(String id) {
        map.remove(id);
    }

    private void startRetry() {
        new Thread(() ->{
            while (!stop) {
                try {
                    Thread.sleep(Constants.RETRY_TIME_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                long now = System.currentTimeMillis();

                for (String key : map.keySet()) {
                    MessageWithTime messageWithTime = map.get(key);

                    if (null != messageWithTime) {
                        if (messageWithTime.getTime() + 3 * Constants.VALID_TIME < now) {
                            log.info("send message failed after 3 min " + messageWithTime);
                            del(key);
                        } else if (messageWithTime.getTime() + Constants.VALID_TIME < now) {
                            DetailRes detailRes = sender.send(messageWithTime.getMessage());

                            if (detailRes.isSuccess()) {
                                del(key);
                            }
                        }
                    }
                }
            }
        }).start();
    }
}

在client端傳送之前,先在本地快取message,程式碼如下:

@Override
public DetailRes send(Object message) {
    try {
        String id = retryCache.generateId();
        retryCache.add(id, message);
        rabbitTemplate.correlationConvertAndSend(message, new CorrelationData(id));
    } catch (Exception e) {
        return new DetailRes(false, "");
    }

    return new DetailRes(true, "");
}

在收到ack時刪除本地快取,程式碼如下:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        log.info("send message failed: " + cause + correlationData.toString());
    } else {
        retryCache.del(correlationData.getId());
    }
});

再次驗證剛才的場景,傳送300w條資料,在傳送的過程中過一段時間close一次connection,傳送結束後,實際傳送資料301.2w條,有一些重複,但是沒有丟失資料。
同時需要驗證本地快取的記憶體洩露問題,程式連續傳送1.5億條資料,記憶體佔用穩定在900M,並沒有明顯的波動。

最後貼一下rabbitmq的效能測試資料:
1 300w條1k的資料,單機部署rabbitmq(8核,32G)
在ack確認模式下平均傳送效率為1.1w條/秒
非ack確認模式下平均傳送效率為1.6w條/秒

2 300w條1k的資料,cluster模式部署3臺(8核*3, 32G*3)
在ack確認模式下平均傳送效率為1.3w條/秒
非ack確認模型下平均傳送效率為1.7w條/秒

3 300w條1k的資料,單機部署rabbitmq(8核,32G)
在ack確認模式下平均消費效率為9000條/秒

4 300w條1k的資料,cluster模式部署3臺(8核*3, 32G*3)
在ack確認模式下平均消費效率為1w條/秒

程式碼地址:

相關推薦

rabbitmq可靠傳送自動機制

接這篇 在上文中,主要實現了可靠模式的consumer。而可靠模式的sender實現的相對簡略,主要通過rabbitTemplate來完成。 本以為這樣的實現基本是沒有問題的。但是前段時間做了一個性能壓力測試,但是發現在使用rabbitTemplate時,會有一定的丟資料問題。 當時的場景是用30個執行緒

多執行緒之失敗自動機制

發現一個比較好玩的東西: 如果你在使用多執行緒的使用中異常結束了,你應該如何操作呢? 例子: 正常情況下: 專案一啟動都可以跑完,如果有一段程式碼出現錯誤呢。 系統丟出了一個異常出來。 有沒有發生過這樣的情況,你寫的工作執行緒莫名其妙的掛了,如果不是被你剛好看到,拿只能抓瞎了,不知道啥原因了,因為異常

精講RestTemplate第8篇-請求失敗自動機制

本文是精講RestTemplate第8篇,前篇的blog訪問地址如下: * [精講RestTemplate第1篇-在Spring或非Spring環境下如何使用](http://www.zimug.com/java/spring/%e7%b2%be%e8%ae%b2resttemplate%e7%ac%a

精講響應式WebClient第6篇-請求失敗自動機制,強烈建議你看一看

![精講響應式WebClient第6篇-請求失敗自動重試機制](https://img2020.cnblogs.com/other/1815316/202008/1815316-20200826092554662-1026877669.png) 本文是精講響應式WebClient第6篇,前篇的blog訪問地

rabbitmq機制

1、應答模式 NONE 可以稱之為自動回撥,即使無響應或者發生異常均會通知佇列消費成功,會丟失資料。 AUTO 自動檢測異常或者超時事件,如果發生則返回noack,訊息自動回到隊尾,但是這種方式可能出現訊息體本身有問題,返回隊尾其他佇列也不能消費,造成佇列阻塞。 MANUAL

spring boot rabbitmq 機制

spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重試次數 spring.rabbitmq.listener.simple.retry.en

jedis超時機制註意事項

del number 十進制 包含 str 沒有 時間 機制 await 最近使用redis集群進行incr操作,總是發現計數不準確,後來經過檢查發現redis在執行incr超時會執行重試機制,造成計數不準確,測試代碼: /** * incrf: *

guava的機制guava-retrying使用

tco exceptio AI ide .class exc erb BE 一個 1,添加maven依賴 <dependency> <groupId>com.github.rholder</groupId> &l

PHP-RESQUE機制

pub 實現 方法 ole color except function cti ges 因為PHP-Resque 的重試需要自己寫,網上又沒啥輪子,而且resque也很久不更新了,所以自己研究下resque的源碼,然後也借鑒了Laravel的隊列重試機制,實現了PHP-Re

SpringCloud | FeignClient和Ribbon機制區別與聯系

feign per spec 笛卡爾 making log tag tom str 在spring cloud體系項目中,引入的重試機制保證了高可用的同時,也會帶來一些其它的問題,如冪等操作或一些沒必要的重試。 今天就來分別分析一下 FeignClient 和

Volley超時機制

基礎用法 Volley為開發者提供了可配置的超時重試機制,我們在使用時只需要為我們的Request設定自定義的RetryPolicy即可. 參考設定程式碼如下: int DEFAULT_TIMEOUT_MS = 10000; int DEFAULT_MAX_RETRIES = 3; Str

Appium失敗截圖及機制封裝(二)

analyze ret boolean 做了 ktr assert public false fail 一、失敗截圖封裝 1、主要封裝了失敗之後的文件名、重寫了失敗之後消息、失敗了以後做個截圖,最後置為失敗,並且存放到相對路徑下、截圖操作,未把失敗用例至為Fail,主要代

nginx的機制 proxy_next_upstream

現在對外服務的網站,很少只使用一個服務節點,而是部署多臺伺服器,上層通過一定機制保證容錯和負載均衡。 nginx就是常用的一種HTTP和反向代理伺服器,支援容錯和負載均衡。 nginx的重試機制就是容錯的一種。 在nginx的配置檔案中,proxy_next_upstream項定義了什麼情況下

Spring Cloud Gateway機制

前言 重試,我相信大家並不陌生。在我們呼叫Http介面的時候,總會因為某種原因呼叫失敗,這個時候我們可以通過重試的方式,來重新請求介面。 生活中這樣的事例很多,比如打電話,對方正在通話中啊,訊號不好啊等等原因,你總會打不通,當你第一次沒打通之後,你會打第二次

Spring Cloud Stream消費失敗後的處理策略(一):自動

之前寫了幾篇關於Spring Cloud Stream使用中的常見問題,比如: 如何處理訊息重複消費 如何消費自己生產的訊息 下面幾天就集中來詳細聊聊,當訊息消費失敗之後該如何處理的幾種方式。不過不論哪種方式,都需要與具體業務結合,解決不同業務場景可能出現的問題。 今天第一節,介紹一下Spring Clo

dubbo的機制

對dubbo熟悉的人對下面的配置一定不會陌生: <dubbo:reference id="xxxx" interface="xx" check="true" async="false" retries="1" timeout="2000"/> 上面設定需要關注的幾個地方: 1.check=t

11. kafka機制解讀

前面對kafka的學習中已經瞭解到KafkaProducer通過設定引數retries,如果傳送訊息到broker時丟擲異常,且是允許重試的異常,那麼就會最大重試retries引數指定的次數。 本片文章主要分析幾個問題: - 哪些異常可以重試 - 如何實現

【本人禿頂程式設計師】Spring Cloud Gateway機制

←←←←←←←←←←←← 我都禿頂了,還不點關注! 前言 重試,我相信大家並不陌生。在我們呼叫Http介面的時候,總會因為某種原因呼叫失敗,這個時候我們可以通過重試的方式,來重新請求介面。 生活中這樣的事例很多,比如打電話,對方正在通話中啊,訊號不好啊等等原因,你總會打不通,當你

Eureka高可用之Client機制:RetryableEurekaHttpClient

下面有幾個疑問是我看原始碼時問自己的,先提出來,希望看這篇文章的人帶著疑問去讀,然後初步介紹下EurekaHttpClient體系,後面會詳細講RetryableEurekaHttpClient 1、Eureka Client如何向Eureka Server叢集註冊?如果我的Client端的

Redis學習筆記(七)jedis超時機制注意事項

redis系列文章目錄 jedis客戶端在建立連線時會設定一個超時,並且會有重試機制。 問題起源 在使用jedis客戶端的時候,我測試了一下incr命令,該命令在執行過程中是原子的,所