1. 程式人生 > >高可用服務 AHAS 在訊息佇列 MQ 削峰填穀場景下的應用

高可用服務 AHAS 在訊息佇列 MQ 削峰填穀場景下的應用

開發十年,就只剩下這套架構體系了! >>>   

在訊息佇列中,當消費者去消費訊息的時候,無論是通過 pull 的方式還是 push 的方式,都可能會出現大批量的訊息突刺。如果此時要處理所有訊息,很可能會導致系統負載過高,影響穩定性。但其實可能後面幾秒之內都沒有訊息投遞,若直接把多餘的訊息丟掉則沒有充分利用系統處理訊息的能力。我們希望可以把訊息突刺均攤到一段時間內,讓系統負載保持在訊息處理水位之下的同時儘可能地處理更多訊息,從而起到“削峰填谷”的效果:

上圖中紅色的部分代表超出訊息處理能力的部分。

我們可以看到訊息突刺往往都是瞬時的、不規律的,其後一段時間系統往往都會有空閒資源。我們希望把紅色的那部分訊息平攤到後面空閒時去處理,這樣既可以保證系統負載處在一個穩定的水位,又可以儘可能地處理更多訊息,這時候我們就需要一個能夠控制消費端訊息勻速處理的利器 — AHAS 流控降級,來為訊息佇列削峰填谷,保駕護航。

AHAS 是如何削峰填谷的

AHAS 的流控降級是面向分散式服務架構的專業流量控制組件,主要以流量為切入點,從流量控制、熔斷降級、系統保護等多個維度來幫助您保障服務的穩定性,同時提供強大的聚合監控和歷史監控查詢功能。

AHAS 專門為這種場景提供了勻速排隊的控制特性,可以把突然到來的大量請求以勻速的形式均攤,以固定的間隔時間讓請求通過,以穩定的速度逐步處理這些請求,起到“削峰填谷”的效果,從而避免流量突刺造成系統負載過高。同時堆積的請求將會排隊,逐步進行處理;當請求排隊預計超過最大超時時長的時候則直接拒絕,而不是拒絕全部請求。

比如在 RocketMQ 的場景下配置了勻速模式下請求 QPS 為 5,則會每 200 ms 處理一條訊息,多餘的處理任務將排隊;同時設定了超時時間,預計排隊時長超過超時時間的處理任務將會直接被拒絕。示意圖如下圖所示:

RocketMQ Consumer 接入示例

本部分將引導您快速在 RocketMQ 消費端接入 AHAS 流控降級 Sentinel。

1. 開通 AHAS

首先您需要到AHAS 控制檯開通 AHAS 功能(免費)。可以根據 開通 AHAS 文件 裡面的指引進行開通。

2. 程式碼改造

在結合阿里雲 RocketMQ Client 使用 Sentinel 時,使用者需要引入 AHAS Sentinel 的依賴 ahas-sentinel-client (以 Maven 為例):

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>ahas-sentinel-client</artifactId>
    <version>1.1.0</version>
</dependency>

由於 RocketMQ Client 未提供相應攔截機制,而且每次收到都可能是批量的訊息,因此使用者在處理訊息時需要手動進行資源定義(埋點)。我們可以在處理訊息的邏輯處手動進行埋點,資源名可以根據需要來確定(如 groupId + topic 的組合):

    private static Action handleMessage(Message message, String groupId, String topic) {
        Entry entry = null;
        try {
            // 資源名稱為 groupId 和 topic 的組合,便於標識,同時可以針對不同的 groupId 和 topic 配置不同的規則
            entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);
          
            // 在此處編寫真實的處理邏輯
            System.out.println(System.currentTimeMillis() + " | handling message: " + message);
            return Action.CommitMessage;
        } catch (BlockException ex) {
            // 在編寫處理被流控的邏輯
            // 示例:可以在此處記錄錯誤或進行重試
            System.err.println("Blocked, will retry later: " + message);
            return Action.ReconsumeLater; // 會觸發訊息重新投遞
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    }

消費者訂閱訊息的邏輯示例:

Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, "*", (message, context) -> {
    return handleMessage(message);
});
consumer.start();

更多關於 RocketMQ SDK 的資訊可以參考 訊息佇列 RocketMQ 入門文件

3. 獲取 AHAS 啟動引數

注意:若在本地執行接入 AHAS Sentinel 控制檯需要在頁面左上角選擇 公網 環境,若在阿里雲 ECS 環境則在頁面左上角選擇對應的 Region 環境。

我們可以進入 AHAS 控制檯,點選左側側邊欄的 流控降級,進入 AHAS 流控降級控制檯應用總覽頁面。在頁面右上角,單擊新增應用,選擇 SDK 接入頁籤,到 配置啟動引數 頁籤拿到需要的啟動引數(詳情請參考 SDK 接入文件),類似於:

-Dproject.name=AppName -Dahas.license=<License>

其中 project.name 配置項代表應用名(會顯示在控制檯,比如 MqConsumerDemo),ahas.license配置項代表自己的授權 license(ECS 環境不需要此項)。

4. 啟動 Consumer,配置規則

接下來我們新增獲取到的啟動引數,啟動修改好的 Consumer 應用。由於 AHAS 流控降級需要進行資源呼叫才能觸發初始化,因此首先需要向對應 group/topic 傳送一條訊息觸發初始化。消費端接收到訊息後,我們就可以在 AHAS Sentinel 控制檯上看到我們的應用了。點選應用卡片,進入詳情頁面後點擊左側側邊欄的“機器列表”。我們可以在機器列表頁面看到剛剛接入的機器,代表接入成功:

點選“請求鏈路”頁面,我們可以看到之前定義的資源。點選右邊的“流控”按鈕新增新的流控規則:

我們在“流控方式”中選擇“排隊等待”,設定 QPS 為 10,代表每 100ms 勻速通過一個請求;並且設定最大超時時長為 2000ms,超出此超時時間的請求將不會排隊,立即拒絕。配置完成後點選新建按鈕。

5. 傳送訊息,檢視效果

下面我們可以在 Producer 端批量傳送訊息,然後在 Consumer 端的控制檯輸出處觀察效果。可以看到訊息消費的速率是勻速的,大約每 100 ms 消費一條訊息:

1550732955137 | handling message: Hello MQ 2453
1550732955236 | handling message: Hello MQ 9162
1550732955338 | handling message: Hello MQ 4944
1550732955438 | handling message: Hello MQ 5582
1550732955538 | handling message: Hello MQ 4493
1550732955637 | handling message: Hello MQ 3036
1550732955738 | handling message: Hello MQ 1381
1550732955834 | handling message: Hello MQ 1450
1550732955937 | handling message: Hello MQ 5871

同時不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。注意在處理請求被拒絕的時候,需要根據需求決定是否需要重新消費訊息。

我們也可以點選左側側邊欄的“監控詳情”進入監控詳情頁面,檢視處理訊息的監控曲線:

對比普通限流模式的監控曲線(最右面的部分):

如果不開啟勻速模式,只是普通的限流模式,則只會同時處理 10 條訊息,其餘的全部被拒絕,即使後面的時間系統資源充足多餘的請求也無法被處理,因而浪費了許多空閒資源。兩種模式對比說明勻速模式下訊息處理能力得到了更好的利用。

Kafka 接入程式碼示例

Kafka 消費端接入 AHAS 流控降級的思路與上面的 RocketMQ 類似,這裡給出一個簡單的程式碼示例:

private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
    pool.submit(() -> {
        Entry entry = null;
        try {
            // 資源名稱為 groupId 和 topic 的組合,便於標識,同時可以針對不同的 groupId 和 topic 配置不同的規則
            entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);

            // 在此處理訊息.
            System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
        } catch (BlockException ex) {
            // Blocked.
            // NOTE: 在處理請求被拒絕的時候,需要根據需求決定是否需要重新消費訊息
            System.err.println("Blocked: " + record.toString());
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    });
}

消費訊息的邏輯:

while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        // 必須在下次 poll 之前消費完這些資料, 且總耗時不得超過 SESSION_TIMEOUT_MS_CONFIG
        // 建議開一個單獨的執行緒池來消費訊息,然後非同步返回結果
        for (ConsumerRecord<String, String> record : records) {
            handleMessage(record, groupId, topic);
        }
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (Throwable ignore) {
        }
        e.printStackTrace();
    }
}

其它

以上介紹的只是 AHAS 流控降級的其中一個場景 —— 請求勻速,它還可以處理更復雜的各種情況,比如:

  • 流量控制:可以針對不同的呼叫關係,以不同的執行指標(如 QPS、執行緒數、系統負載等)為基準,對資源呼叫進行流量控制,將隨機的請求調整成合適的形狀(請求勻速、Warm Up 等)。
  • 熔斷降級:當呼叫鏈路中某個資源出現不穩定的情況,如平均 RT 增高、異常比例升高的時候,會使對此資源的呼叫請求快速失敗,避免影響其它的資源導致級聯失敗。
  • 系統負載保護:對系統的維度提供保護。當系統負載較高的時候,提供了對應的保護機制,讓系統的入口流量和系統的負載達到一個平衡,保證系統在能力範圍之內處理最多的請求。

您可以參考 AHAS 流控降級文件 來挖掘更多的場景。

相關推薦

可用服務 AHAS訊息佇列 MQ 場景應用

開發十年,就只剩下這套架構體系了! >>>   

可用服務 AHAS 在消息隊列 MQ 場景應用

ktr current record 線程池 blog ignore messages pic amp 在消息隊列中,當消費者去消費消息的時候,無論是通過 pull 的方式還是 push 的方式,都可能會出現大批量的消息突刺。如果此時要處理所有消息,很可能會導致系統負載過高

Amazon SQS 訊息佇列服務_訊息佇列mq解決方案

Amazon Simple Queue Service (SQS) 是一種完全託管的訊息佇列服務,可讓您分離和擴充套件微服務、分散式系統和無伺服器應用程式。SQS 消除了與管理和運營訊息型中介軟體相關的複雜性和開銷,並使開發人員能夠專注於重要工作。藉助 SQS,您可以在軟體元件之間傳送、儲

配置管理 ACM 在可用服務 AHAS 流控降級元件中的應用場景

開發十年,就只剩下這套架構體系了! >>>   

配置管理 ACM 在可用服務 AHAS 流控降級組件中的應用場景

信息 清晰 依賴 text 大數 穩定 變更 力達 con 應用配置管理(Application Configuration Management,簡稱 ACM)是一款應用配置中心產品。基於ACM您可以在微服務、DevOps、大數據等場景下極大地減輕配置管理的工作量,同時保

阿里雲AHAS應用可用服務初體驗

AHAS是阿里雲提供應用高可用服務(Application High Availability Service)產品。 高可用這個關鍵詞可以說是網際網路及軟體開發行業熱度一直很高的詞語了,阿里雲推出的這款產品,如果你是開發人員,可能看名字就會被吸引。 目前產品是免費開通的,我們來體驗一下。 首先登陸阿里雲

Amazon MQ 訊息代理服務_ActiveMQ訊息佇列託管

使用 Amazon MQ,您可以輕鬆將訊息收發遷移至雲,同時保留應用程式間的現有連線。該服務支援行業標準 API 和各種訊息收發協議,其中包括 JMS、NMS、AMQP、STOMP、MQTT 和 WebSocket。因此,您可以輕鬆從使用這些標準的任意訊息代理遷移至 Amazon

Spring cloud Eureka 服務治理(可用服務中心)

image 本地host available png active url 狀態 name spring 在微服務的架構中,我們考慮發生故障的情況,所以在生產環境中我們需要對服務中各個組件進行高可用部署。 Eureka Server 的高可用實際上就是將自己作為服務想其

alwayson08-啟動always on可用服務

always on ima images sql http always 服務 com .com 打開sql server 配置管理器 alwayson08-啟動always on高可用服務

keepalived安裝與配置,組建可用服務

eight 如何 .org keep 局域網 yum 從服務器 改變 -1 一、準備環境   linux系統:CentOS7   keepalived版本:keepalived-1.3.5.tar.gz   keepalived下載地址:http://www.keepali

【架構】Heartbeat可用服務(2)

agents 可用 分享 ont .com war 集群 需求 spa Heartbeat高可用服務 【13】Heartbeat發展情況及分支軟件介紹   有關Heartbeat分3個分支的說明     自2.1.4版本後,Linux-HA將Heartbeat分包

Spring Cloud Eureka 4 (可用服務註冊中心)

自己 def port hello -c 圖片 sys img 效果 在微服務這樣的分布式環境中,我們需要充分考慮發生故障的情況,所以在生產環境中必須考慮對各個組件進行高可用部署,對於服務註冊中心也是一樣。 Eureka Server 的高可用實際上就是講自己作為服

億級流量電商詳情頁系統實戰-緩存架構+可用服務架構+微服務架構第二版視頻教程

class 全文檢索 ron pan 教程 ec2 dubbo sgx 技術經理 14套java精品高級架構課,緩存架構,深入Jvm虛擬機,全文檢索Elasticsearch,Dubbo分布式Restful 服務,並發原理編程,SpringBoot,SpringCloud

構建可用服務

不同 地域 mbo host proxy 獲取數據 恢復 post 秒殺 一. 什麽是高可用性 服務端,顧名思義就是為用戶提供服務的。 停工時間,就是不能向用戶提供服務的時間。 高可用,就是系統具有高度可用性,盡量減少停工時間。 停工的原因一般有: 服務器故障。例如服務器

heartbeat單獨提供可用服務

而不是 boolean member net 設置ip 失效 文件 plugin receive 本文目錄:1.簡介2.安裝heartbeat 2.1 編譯安裝Heartbeat3.heartbeat相關配置文件 3.1 配置文件ha.cf 3.2 配置文件authkeys

十一.keepalived可用服務實踐部署

-s ash app The bind vim bin shel 全局 期中集群架構-第十一章-keepalived高可用集群章節======================================================================01

可用服務

高可用架構 什麽是服務層 眾所周知,服務層主要用來處理網站業務邏輯的,是大型業務網站的核心。比如下面三個業務系統就是典型的服務層,提供基礎服務功能的聚合 用戶中心:主要負責用戶註冊、登錄、獲取用戶用戶信息功能 交易中心:主要包括正向訂單生成、逆向訂單、查詢、金額計算等功能 支付中心:主要包括訂單支付、收銀臺

用簡單的方法構建一個可用服務

高可用 架構 redis 服務器 一. 什麽是高可用性 服務端,顧名思義就是為用戶提供服務的。停工時間,就是不能向用戶提供服務的時間。高可用,就是系統具有高度可用性,盡量減少停工時間。 停工的原因一般有: 服務器故障。例如服務器宕機,服務器網絡出現問題,機房或者機架出現問題等。訪問量急劇上升,

Keepalived可用服務

mat ack worker oot roman right xtra 制作 重啟 第1章 Keepalived高可用服務1.1 Keepalived介紹 Keepalived軟件起初是專為LVS負載均衡軟件設計的,用來管理並監控LVS集群系統中各個服務節點的狀態

keepalived可用服務安裝

size ado ica 9.png 技術分享 ive evel ces keepalive keepalived通過內核管理lvs ,所以沒裝沒裝kernel-devel configure 完看有沒有下面三個yes 的Use VRRP Framework