1. 程式人生 > >常用限流演算法與Guava RateLimiter原始碼解析

常用限流演算法與Guava RateLimiter原始碼解析

在分散式系統中,應對高併發訪問時,快取、限流、降級是保護系統正常執行的常用方法。當請求量突發暴漲時,如果不加以限制訪問,則可能導致整個系統崩潰,服務不可用。同時有一些業務場景,比如簡訊驗證碼,或者其它第三方API呼叫,也需要提供必要的訪問限制支援。還有一些資源消耗過大的請求,比如資料匯出等(參考 [記一次線上Java服務CPU 100%處理過程](http://blog.jboost.cn/issue-cpu-high.html) ),也有限制訪問頻率的需求。 常見的限流演算法有令牌桶演算法,漏桶演算法,與計數器演算法。本文主要對三個演算法的基本原理及Google Guava包中令牌桶演算法的實現RateLimiter進行介紹,下一篇文章介紹最近寫的一個以RateLimiter為參考的分散式限流實現及計數器限流實現。 ## 令牌桶演算法 令牌桶演算法的原理就是以一個恆定的速度往桶裡放入令牌,每一個請求的處理都需要從桶裡先獲取一個令牌,當桶裡沒有令牌時,則請求不會被處理,要麼排隊等待,要麼降級處理,要麼直接拒絕服務。當桶裡令牌滿時,新新增的令牌會被丟棄或拒絕。 令牌桶演算法的處理示意圖如下(圖片來自網路) ![token-bucket](https://img2020.cnblogs.com/other/632381/202007/632381-20200722144743006-722610511.png) 令牌桶演算法主要是可以控制請求的平均處理速率,它允許預消費,即可以提前消費令牌,以應對突發請求,但是後面的請求需要為預消費買單(等待更長的時間),以滿足請求處理的平均速率是一定的。 ## 漏桶演算法 漏桶演算法的原理是水(請求)先進入漏桶中,漏桶以一定的速度出水(處理請求),當水流入速度大於流出速度導致水在桶內逐漸堆積直到桶滿時,水會溢位(請求被拒絕)。 漏桶演算法的處理示意圖如下(圖片來自網路) ![leaky-bucket](https://img2020.cnblogs.com/other/632381/202007/632381-20200722144743299-1156201993.png) 漏桶演算法主要是控制請求的處理速率,平滑網路上的突發流量,請求可以以任意速度進入漏桶中,但請求的處理則以恆定的速度進行。 ## 計數器演算法 計數器演算法是限流演算法中最簡單的一種演算法,限制在一個時間視窗內,至多處理多少個請求。比如每分鐘最多處理10個請求,則從第一個請求進來的時間為起點,60s的時間視窗內只允許最多處理10個請求。下一個時間視窗又以前一時間視窗過後第一個請求進來的時間為起點。常見的比如一分鐘內只能獲取一次簡訊驗證碼的功能可以通過計數器演算法來實現。 ## Guava RateLimiter解析 Guava是Google開源的一個工具包,其中的RateLimiter是實現了令牌桶演算法的一個限流工具類。在pom.xml中新增guava依賴,即可使用RateLimiter ```xml ``` 如下測試程式碼示例了RateLimiter的用法, ```java public static void main(String[] args) { RateLimiter rateLimiter = RateLimiter.create(1); //建立一個每秒產生一個令牌的令牌桶 for(int i=1;i<=5;i++) { double waitTime = rateLimiter.acquire(i); //一次獲取i個令牌 System.out.println("acquire:" + i + " waitTime:" + waitTime); } } ``` 執行後,輸出如下, ```shell acquire:1 waitTime:0.0 acquire:2 waitTime:0.997729 acquire:3 waitTime:1.998076 acquire:4 waitTime:3.000303 acquire:5 waitTime:4.000223 ``` 第一次獲取一個令牌時,等待0s立即可獲取到(這裡之所以不需要等待是因為令牌桶的預消費特性),第二次獲取兩個令牌,等待時間1s,這個1s就是前面獲取一個令牌時因為預消費沒有等待延到這次來等待的時間,這次獲取兩個又是預消費,所以下一次獲取(取3個時)就要等待這次預消費需要的2s了,依此類推。可見預消費不需要等待的時間都由下一次來買單,以保障一定的平均處理速率(上例為1s一次)。 RateLimiter有兩種實現: 1. SmoothBursty: 令牌的生成速度恆定。使用 `RateLimiter.create(double permitsPerSecond)` 建立的是 SmoothBursty 例項。 2. SmoothWarmingUp:令牌的生成速度持續提升,直到達到一個穩定的值。WarmingUp,顧名思義就是有一個熱身的過程。使用 `RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)` 時建立就是 SmoothWarmingUp 例項,其中 warmupPeriod 就是熱身達到穩定速度的時間。 類結構如下 ![ratelimiter-struct](https://img2020.cnblogs.com/other/632381/202007/632381-20200722144743522-1172311057.png) 關鍵屬性及方法解析(以 SmoothBursty 為例) 1.關鍵屬性 ```java /** 桶中當前擁有的令牌數. */ double storedPermits; /** 桶中最多可以儲存多少秒存入的令牌數 */ double maxBurstSeconds; /** 桶中能儲存的最大令牌數,等於storedPermits*maxBurstSeconds. */ double maxPermits; /** 放入令牌的時間間隔*/ double stableIntervalMicros; /** 下次可獲取令牌的時間點,可以是過去也可以是將來的時間點*/ private long nextFreeTicketMicros = 0L; ``` 2.關鍵方法 呼叫 `RateLimiter.create(double permitsPerSecond)` 方法時,建立的是 SmoothBursty 例項,預設設定 maxBurstSeconds 為1s。SleepingStopwatch 是guava中的一個時鐘類實現。 ```java @VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } ``` 並通過呼叫 `SmoothBursty.doSetRate(double, long)` 方法進行初始化,該方法中: 1. 呼叫 `resync(nowMicros)` 對 storedPermits 與 nextFreeTicketMicros 進行了調整——如果當前時間晚於 nextFreeTicketMicros,則計算這段時間內產生的令牌數,累加到 storedPermits 上,並更新下次可獲取令牌時間 nextFreeTicketMicros 為當前時間。 2. 計算 stableIntervalMicros 的值,1/permitsPerSecond。 3. 呼叫 `doSetRate(double, double)` 方法計算 maxPermits 值(maxBurstSeconds*permitsPerSecond),並根據舊的 maxPermits 值對 storedPermits 進行調整。 原始碼如下所示 ```java @Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } ``` 呼叫 `acquire(int)` 方法獲取指定數量的令牌時, 1. 呼叫 `reserve(int)` 方法,該方法最終呼叫 `reserveEarliestAvailable(int, long)` 來更新下次可取令牌時間點與當前儲存的令牌數,並返回本次可取令牌的時間點,根據該時間點計算需要等待的時間 2. 阻塞等待1中返回的等待時間 3. 返回等待的時間(秒) 原始碼如下所示 ```java /** 獲取指定數量(permits)的令牌,阻塞直到獲取到令牌,返回等待的時間*/ @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } /** 返回需要等待的時間*/ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } /** 針對此次需要獲取的令牌數更新下次可取令牌時間點與儲存的令牌數,返回本次可取令牌的時間點*/ @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); // 更新當前資料 long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可消費的令牌數 double freshPermits = requiredPermits - storedPermitsToSpend; // 需要新增的令牌數 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 需要等待的時間 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 更新下次可取令牌的時間點 this.storedPermits -= storedPermitsToSpend; // 更新當前儲存的令牌數 return returnValue; } ``` `acquire(int)` 方法是獲取不到令牌時一直阻塞,直到獲取到令牌,`tryAcquire(int,long,TimeUnit)` 方法則是在指定超時時間內嘗試獲取令牌,如果獲取到或超時時間到則返回是否獲取成功 1. 先判斷是否能在指定超時時間內獲取到令牌,通過 `nextFreeTicketMicros <= timeoutMicros + nowMicros` 是否為true來判斷,即可取令牌時間早於當前時間加超時時間則可取(預消費的特性),否則不可獲取。 2. 如果不可獲取,立即返回false。 3. 如果可獲取,則呼叫 `reserveAndGetWaitLength(permits, nowMicros)` 來更新下次可取令牌時間點與當前儲存的令牌數,返回等待時間(邏輯與前面相同),並阻塞等待相應的時間,返回true。 原始碼如下所示 ```java public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { //判斷是否能在超時時間內獲取指定數量的令牌 return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; //只要可取時間小於當前時間+超時時間,則可獲取(可預消費的特性!) } @Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; } ``` 以上就是 SmoothBursty 實現的基本處理流程。注意兩點: 1. RateLimiter 通過限制後面請求的等待時間,來支援一定程度的突發請求——預消費的特性。 2. RateLimiter 令牌桶的實現並不是起一個執行緒不斷往桶裡放令牌,而是以一種延遲計算的方式(參考`resync`函式),在每次獲取令牌之前計算該段時間內可以產生多少令牌,將產生的令牌加入令牌桶中並更新資料來實現,比起一個執行緒來不斷往桶裡放令牌高效得多。(想想如果需要針對每個使用者限制某個介面的訪問,則針對每個使用者都得建立一個RateLimiter,並起一個執行緒來控制令牌存放的話,如果線上使用者數有幾十上百萬,起執行緒來控制是一件多麼恐怖的事情) ## 總結 本文介紹了限流的三種基本演算法,其中令牌桶演算法與漏桶演算法主要用來限制請求處理的速度,可將其歸為限速,計數器演算法則是用來限制一個時間視窗內請求處理的數量,可將其歸為限量(對速度不限制)。Guava 的 RateLimiter 是令牌桶演算法的一種實現,但 RateLimiter 只適用於單機應用,在分散式環境下就不適用了。雖然已有一些開源專案可用於分散式環境下的限流管理,如阿里的Sentinel,但對於小型專案來說,引入Sentinel可能顯得有點過重,但限流的需求在小型專案中也是存在的,下一篇文章就介紹下基於 RateLimiter 的分散式下的限流實現。 --- [轉載請註明出處] 作者:雨歌 歡迎關注作者公眾號:半路雨歌,檢視更多技術乾貨文章 ![qrcode](https://img2020.cnblogs.com/other/632381/202007/632381-20200722144743660-2146236711.jpg)