1. 程式人生 > 其它 >多級漏桶突破百度AI介面QPS限制瓶頸

多級漏桶突破百度AI介面QPS限制瓶頸

一、描述

​ 工作中需要呼叫第三方介面(百度AI介面),實現一些AI相關的功能。但是開通付費介面後,但仍有10QPS的限制,超出的部分會被百度拒絕,直接報錯。而我們的業務需求是要基本保證呼叫成功的。因此需要一個漏桶/限流器來控制呼叫速度去適配這10QPS的限制,剩餘的請求進入等待佇列。

二、前置知識

2.1 漏桶與令牌桶

漏桶

漏桶演算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水,當水流入速度過大會直接溢位,可以看出漏桶演算法能強行限制資料的傳輸速率。

令牌桶

對於很多應用場景來說,除了要求能夠限制資料的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶演算法可能就不合適了,令牌桶演算法更為適合。令牌桶演算法的原理是系統會以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務。

2.2 限流利器-Semaphore

在 JUC 包下,有一個 Semaphore 類,翻譯成訊號量,Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點相似,不同的地方是,鎖同一時刻只允許一個執行緒訪問某一資源,而 Semaphore 則可以控制同一時刻多個執行緒訪問某一資源。

Semaphore(訊號量)並不是 Java 語言特有的,幾乎所有的併發語言都有。所以也就存在一個「訊號量模型」的概念,如下圖所示:

訊號量模型比較簡單,

可以概括為:「一個計數器、一個佇列、三個方法」

計數器:記錄當前還可以執行多少個資源訪問資源。

佇列:待訪問資源的執行緒

init():初始化計數器的值,可就是允許多少執行緒同時訪問資源。

up():計數器加1,有執行緒歸還資源時,如果計數器的值大於或者等於 0 時,從等待佇列中喚醒一個執行緒

down():計數器減 1,有執行緒佔用資源時,如果此時計數器的值小於 0 ,執行緒將被阻塞。

鑑於我們是部署多例項的分散式系統,JUC實現Semaphore的並不是適用,使用Redisson實現的分散式訊號量。

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>

下面介紹一下關於Semaphore的主要方法:

//獲取指定名稱的訊號量例項
RSemaphore getSemaphore(String name);
 
//獲取指定名稱的訊號量例項,許可帶有過期時間
RPermitExpirableSemaphore getPermitExpirableSemaphore(String name);
 
//嘗試設定訊號量許可個數
boolean trySetPermits(int permits);
 
//從訊號量中獲取許可,相當於獲取到執行權,獲取不到會阻塞,直到獲取到為止
String acquire() throws InterruptedException;
 
//獲取一個指定過期時間的許可
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
 
//嘗試獲取一個許可,不會阻塞,獲取不到返回null
String tryAcquire();
 
//嘗試在指定時間內獲取一個許可
String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
 
//嘗試在指定時間內獲取一個指定過期時間的許可
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
 
//嘗試通過許可id釋放
boolean tryRelease(String permitId);
 
//通過id釋放許可
void release(String permitId);

三、呼叫百度AI介面適配實現

通過對漏桶、令牌桶概念的瞭解,以及對限流利器-semaphore的的認識;就可以通過漏桶的理念以及redisson的semaphore API 去實現限制對百度介面的呼叫,保證10QPS勻速呼叫;

還有一個問題:“漏桶”的容量如何確定?

根據業務需求而定,假設呼叫AI介面的等待時間“容忍度”是 10s,假設AI介面本身2s的耗時,那麼對於同一個AI介面的請求就是80個,“漏桶”的容量(等等佇列)就是80。

同樣可以使用現成的redisson提供的分散式佇列API,去實現、假如佇列滿,直接拒絕。

當然這個AI介面本身的耗時會根據它的型別以及資料的大小而改變,這裡使用執行緒池執行器 ThreadPoolExecutor 的一些特性以及自帶的佇列去實現功能,具體:

  • future.get(TIME_OUT, TimeUnit.MILLISECONDS); 控制“容忍”時間
  • 執行緒池等待佇列大小來控制“桶”的容量,超出拒絕。

部分程式碼實現:

1.構建呼叫任務,期中maxQPS為百度AI介面的最大QPS限制,TIME_WINDOW為訊號量釋放時間 = 1s

public class BaiduAIClientTask implements Callable<BaiduAIClientTask> {
    @Override
    public BaiduAIClientTask call() {
        RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("key");
        semaphore.trySetPermits(maxQPS);
        semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
        //呼叫百度介面
        this.jsonObject = aiClient.send(aipRequest);
    }
}

2.定義執行緒池執行任務

public class ClientAIOperation {
    public JSONObject getResponse(AipRequest aipRequest) {
        //獲取固定執行緒的執行緒池(單例),設定執行緒數量、佇列大小
        ExecutorService executorService = baiduAIClientContext.getExecutorService(nThreads,ququeSize);
        JSONObject jsonObject = null;
        Future<BaiduAIClientTask> future = null;
        try {
            future = executorService.submit(task);
            BaiduAIClientTask baiduAIClientTask = future.get(TIME_OUT, TimeUnit.MILLISECONDS);
            jsonObject = baiduAIClientTask.getJsonObject();
        } catch (RejectedExecutionException e) {
            throw new BaiduClientException("系統繁忙~");
        } catch (TimeoutException e) {
            future.cancel(true);
            throw new BaiduClientException("執行超時,請重試~");
        }
        return jsonObject;
    }
}

四、多帳號提升QPS瓶頸

上面的操作也是僅是儘可能保證使用者在容忍時間內能呼叫成功,支援10QPS的併發。

QPS疊加包

通過調研瞭解,百度AI介面提供QPS疊加包服務,在對應上課時間購買QPS疊加包,來應對高併發呼叫。

缺點:

①需要關心具體業務使用時間,頻繁的購買相應的QPS疊加包。

②太貴了 ,如下表:200QPS每天就是 2000元,而且僅僅是單個介面

介面 按天購買價格(元/QPS/天) 按月購買價格(元/QPS/月)
通用物體和場景識別 50 750
logo識別-檢索 10 150
影象主體檢測(單主體) 5 75
植物識別 10 150
動物識別 10 150
菜品識別 10 150
自定義菜品識別-檢索 50 750
果蔬識別 10 150
地標識別 10 150
影象主體檢測(多主體) 10 150

多賬號解決方案

鑑於QPS疊加包高額費用以及需要頻繁的購買相應服務,那麼我們採用多帳號的方案來解決。

五、多級漏桶設計

但是多帳號同樣面臨一個問題,一般AI能力服務會隨著呼叫量的增加而費用減少,如下表:

月呼叫量(萬次) 菜品識別(元/千次)
0<月呼叫量<=5 0.70
5<月呼叫量<=10 0.60
10<月呼叫量<=20 0.50
20<月呼叫量<=50 0.40
50<月呼叫量<=100 0.35
100<月呼叫量 0.30

為了儘量使得一個賬號達到減免額度,採用如下圖所示設計,當用戶請求過來預設進入第一個『桶』中,當併發量過大,第一個『桶』處理不過來,溢位到下一個桶中,依次溢位。當最後一個『桶』滿後,剩下的請求在『蓄水池』{等待佇列}中等待

部分程式碼實現:

通過遞迴與tryAcquire()方法的特行,實現讓桶溢位後流入下一個桶。

@Override
public BaiduAIClientTask call() {
    //蓄水池外流速度訊號量 
    RPermitExpirableSemaphore semaphoreAll = redissonClient.getPermitExpirableSemaphore(“蓄水池”);
    //桶的總流量 = 桶流速 * 桶數
    semaphoreAll.trySetPermits(maxQPS * baiduAIClientList.size());
    try {
        //申請蓄水池訊號量,如果滿了會阻塞於此 模擬閥門
        semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
        //第一個桶開始工作 leakyBucketNumber = 0
        this.leakyBucketWork(leakyBucketNumber,baiduAIClientList);
    } catch (RedissonShutdownException rse) {
        log.warn("【AI介面呼叫】終止!{}", rse.getMessage());
    } catch (InterruptedException e) {
        log.info("【AI介面呼叫】執行緒中斷,執行失敗");
    }
    return this;
}
 
/**
 * 漏桶工作,遞迴處理溢位
 *
 * @param leakyBucketNumber - 漏桶編號
 * @param baiduAIClientList - 呼叫百度client集合
 */
private void leakyBucketWork(Integer leakyBucketNumber, List<BaiduAIClient> baiduAIClientList) throws InterruptedException {
    //單個桶流速訊號量  命名:url.leakyBucketNumber
    RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore(“單桶 + 編號”);
    semaphore.trySetPermits(maxQPS);
    String acquire = semaphore.tryAcquire(0,TIME_WINDOW, TimeUnit.MILLISECONDS);
    if (Strings.isNotEmpty(acquire)) {
        log.info("桶編號-{},獲取到訊號量:{},開始執行任務",leakyBucketNumber,acquire);
        BaiduAIClient aiClient = baiduAIClientList.get(leakyBucketNumber);
        this.jsonObject = aiClient.send(aipRequest);
    } else if (leakyBucketNumber < baiduAIClientList.size() - 1) {
        //溢位:流向下一個桶
        leakyBucketNumber++;
        leakyBucketWork(leakyBucketNumber,baiduAIClientList);
    }
}

六、參考