多級漏桶突破百度AI介面QPS限制瓶頸
一、描述
工作中需要呼叫第三方介面(百度AI介面),實現一些AI相關的功能。但是開通付費介面後,但仍有10QPS的限制,超出的部分會被百度拒絕,直接報錯。而我們的業務需求是要基本保證呼叫成功的。因此需要一個漏桶/限流器來控制呼叫速度去適配這10QPS的限制,剩餘的請求進入等待佇列。
二、前置知識
2.1 漏桶與令牌桶
漏桶
漏桶演算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水,當水流入速度過大會直接溢位,可以看出漏桶演算法能強行限制資料的傳輸速率。
令牌桶
對於很多應用場景來說,除了要求能夠限制資料的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶演算法可能就不合適了,令牌桶演算法更為適合。令牌桶演算法的原理是系統會以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務。
2.2 限流利器-Semaphore
在 JUC 包下,有一個 Semaphore 類,翻譯成訊號量,Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點相似,不同的地方是,鎖同一時刻只允許一個執行緒訪問某一資源,而 Semaphore 則可以控制同一時刻多個執行緒訪問某一資源。
Semaphore(訊號量)並不是 Java 語言特有的,幾乎所有的併發語言都有。所以也就存在一個「訊號量模型」的概念,如下圖所示:
訊號量模型比較簡單,
可以概括為:「一個計數器、一個佇列、三個方法」
計數器:記錄當前還可以執行多少個資源訪問資源。
佇列:待訪問資源的執行緒
init():初始化計數器的值,可就是允許多少執行緒同時訪問資源。
up():計數器加1,有執行緒歸還資源時,如果計數器的值大於或者等於 0 時,從等待佇列中喚醒一個執行緒
down():計數器減 1,有執行緒佔用資源時,如果此時計數器的值小於 0 ,執行緒將被阻塞。
鑑於我們是部署多例項的分散式系統
,JUC實現Semaphore的並不是適用,使用Redisson實現的分散式訊號量。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> </dependency>
下面介紹一下關於Semaphore的主要方法:
//獲取指定名稱的訊號量例項
RSemaphore getSemaphore(String name);
//獲取指定名稱的訊號量例項,許可帶有過期時間
RPermitExpirableSemaphore getPermitExpirableSemaphore(String name);
//嘗試設定訊號量許可個數
boolean trySetPermits(int permits);
//從訊號量中獲取許可,相當於獲取到執行權,獲取不到會阻塞,直到獲取到為止
String acquire() throws InterruptedException;
//獲取一個指定過期時間的許可
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
//嘗試獲取一個許可,不會阻塞,獲取不到返回null
String tryAcquire();
//嘗試在指定時間內獲取一個許可
String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
//嘗試在指定時間內獲取一個指定過期時間的許可
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
//嘗試通過許可id釋放
boolean tryRelease(String permitId);
//通過id釋放許可
void release(String permitId);
三、呼叫百度AI介面適配實現
通過對漏桶、令牌桶概念的瞭解,以及對限流利器-semaphore的的認識;就可以通過漏桶的理念以及redisson的semaphore API 去實現限制對百度介面的呼叫,保證10QPS勻速呼叫;
還有一個問題:“漏桶”的容量如何確定?
根據業務需求而定,假設呼叫AI介面的等待時間“容忍度”是 10s,假設AI介面本身2s的耗時,那麼對於同一個AI介面的請求就是80個,“漏桶”的容量(等等佇列)就是80。
同樣可以使用現成的redisson提供的分散式佇列API,去實現、假如佇列滿,直接拒絕。
當然這個AI介面本身的耗時會根據它的型別以及資料的大小而改變,這裡使用執行緒池執行器 ThreadPoolExecutor 的一些特性以及自帶的佇列去實現功能,具體:
- future.get(TIME_OUT, TimeUnit.MILLISECONDS); 控制“容忍”時間
- 執行緒池等待佇列大小來控制“桶”的容量,超出拒絕。
部分程式碼實現:
1.構建呼叫任務,期中maxQPS為百度AI介面的最大QPS限制,TIME_WINDOW為訊號量釋放時間 = 1s
public class BaiduAIClientTask implements Callable<BaiduAIClientTask> {
@Override
public BaiduAIClientTask call() {
RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("key");
semaphore.trySetPermits(maxQPS);
semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
//呼叫百度介面
this.jsonObject = aiClient.send(aipRequest);
}
}
2.定義執行緒池執行任務
public class ClientAIOperation {
public JSONObject getResponse(AipRequest aipRequest) {
//獲取固定執行緒的執行緒池(單例),設定執行緒數量、佇列大小
ExecutorService executorService = baiduAIClientContext.getExecutorService(nThreads,ququeSize);
JSONObject jsonObject = null;
Future<BaiduAIClientTask> future = null;
try {
future = executorService.submit(task);
BaiduAIClientTask baiduAIClientTask = future.get(TIME_OUT, TimeUnit.MILLISECONDS);
jsonObject = baiduAIClientTask.getJsonObject();
} catch (RejectedExecutionException e) {
throw new BaiduClientException("系統繁忙~");
} catch (TimeoutException e) {
future.cancel(true);
throw new BaiduClientException("執行超時,請重試~");
}
return jsonObject;
}
}
四、多帳號提升QPS瓶頸
上面的操作也是僅是儘可能保證使用者在容忍時間內能呼叫成功,支援10QPS的併發。
QPS疊加包
通過調研瞭解,百度AI介面提供QPS疊加包服務,在對應上課時間購買QPS疊加包,來應對高併發呼叫。
缺點:
①需要關心具體業務使用時間,頻繁的購買相應的QPS疊加包。
②太貴了 ,如下表:200QPS每天就是 2000元,而且僅僅是單個介面
介面 | 按天購買價格(元/QPS/天) | 按月購買價格(元/QPS/月) |
---|---|---|
通用物體和場景識別 | 50 | 750 |
logo識別-檢索 | 10 | 150 |
影象主體檢測(單主體) | 5 | 75 |
植物識別 | 10 | 150 |
動物識別 | 10 | 150 |
菜品識別 | 10 | 150 |
自定義菜品識別-檢索 | 50 | 750 |
果蔬識別 | 10 | 150 |
地標識別 | 10 | 150 |
影象主體檢測(多主體) | 10 | 150 |
多賬號解決方案
鑑於QPS疊加包高額費用以及需要頻繁的購買相應服務,那麼我們採用多帳號的方案來解決。
五、多級漏桶設計
但是多帳號同樣面臨一個問題,一般AI能力服務會隨著呼叫量的增加而費用減少,如下表:
月呼叫量(萬次) | 菜品識別(元/千次) |
---|---|
0<月呼叫量<=5 | 0.70 |
5<月呼叫量<=10 | 0.60 |
10<月呼叫量<=20 | 0.50 |
20<月呼叫量<=50 | 0.40 |
50<月呼叫量<=100 | 0.35 |
100<月呼叫量 | 0.30 |
為了儘量使得一個賬號達到減免額度,採用如下圖所示設計,當用戶請求過來預設進入第一個『桶』中,當併發量過大,第一個『桶』處理不過來,溢位到下一個桶中,依次溢位。當最後一個『桶』滿後,剩下的請求在『蓄水池』{等待佇列}中等待
部分程式碼實現:
通過遞迴與tryAcquire()
方法的特行,實現讓桶溢位後流入下一個桶。
@Override
public BaiduAIClientTask call() {
//蓄水池外流速度訊號量
RPermitExpirableSemaphore semaphoreAll = redissonClient.getPermitExpirableSemaphore(“蓄水池”);
//桶的總流量 = 桶流速 * 桶數
semaphoreAll.trySetPermits(maxQPS * baiduAIClientList.size());
try {
//申請蓄水池訊號量,如果滿了會阻塞於此 模擬閥門
semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
//第一個桶開始工作 leakyBucketNumber = 0
this.leakyBucketWork(leakyBucketNumber,baiduAIClientList);
} catch (RedissonShutdownException rse) {
log.warn("【AI介面呼叫】終止!{}", rse.getMessage());
} catch (InterruptedException e) {
log.info("【AI介面呼叫】執行緒中斷,執行失敗");
}
return this;
}
/**
* 漏桶工作,遞迴處理溢位
*
* @param leakyBucketNumber - 漏桶編號
* @param baiduAIClientList - 呼叫百度client集合
*/
private void leakyBucketWork(Integer leakyBucketNumber, List<BaiduAIClient> baiduAIClientList) throws InterruptedException {
//單個桶流速訊號量 命名:url.leakyBucketNumber
RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore(“單桶 + 編號”);
semaphore.trySetPermits(maxQPS);
String acquire = semaphore.tryAcquire(0,TIME_WINDOW, TimeUnit.MILLISECONDS);
if (Strings.isNotEmpty(acquire)) {
log.info("桶編號-{},獲取到訊號量:{},開始執行任務",leakyBucketNumber,acquire);
BaiduAIClient aiClient = baiduAIClientList.get(leakyBucketNumber);
this.jsonObject = aiClient.send(aipRequest);
} else if (leakyBucketNumber < baiduAIClientList.size() - 1) {
//溢位:流向下一個桶
leakyBucketNumber++;
leakyBucketWork(leakyBucketNumber,baiduAIClientList);
}
}