限流演算法的理解和應用場景和實現[臨界點處理]
在開發高併發系統時,有三把利器來保護系統:快取、降級和限流。一下有幾種限流的方法可以參考。
訊號量和令牌桶的區別:
訊號量限制的是併發,資源. 令牌桶如果耗時比較高的話,併發可能會比較大. 限制的是 qps.
計數器法
計數器法是限流演算法裡最簡單也是最容易實現的一種演算法。比如我們規定,對於A介面來說,我們1分鐘的訪問次數不能超過100個。那麼我們可以這麼做:在一開 始的時候,我們可以設定一個計數器counter,每當一個請求過來的時候,counter就加1,如果counter的值大於100並且該請求與第一個 請求的間隔時間還在1分鐘之內,那麼說明請求數過多;如果該請求與第一個請求的間隔時間大於1分鐘,且counter的值還在限流範圍內,那麼就重置 counter,具體演算法的示意圖如下:
這個演算法雖然簡單,但是有一個十分致命的問題,那就是臨界問題,我們看下圖:
從上圖中我們可以看到,假設有一個惡意使用者,他在0:59時,瞬間傳送了100個請求,並且1:00又瞬間傳送了100個請求,那麼其實這個使用者在 1秒裡面,瞬間傳送了200個請求。我們剛才規定的是1分鐘最多100個請求,也就是每秒鐘最多1.7個請求,使用者通過在時間視窗的重置節點處突發請求, 可以瞬間超過我們的速率限制。使用者有可能通過演算法的這個漏洞,瞬間壓垮我們的應用。
聰明的朋友可能已經看出來了,剛才的問題其實是因為我們統計的精度太低。那麼如何很好地處理這個問題呢?或者說,如何將臨界問題的影響降低呢?我們可以看下面的滑動視窗演算法。
public class CounterDemo { public long timeStamp = getNowTime(); public int reqCount = 0; public final int limit = 100; // 時間視窗內最大請求數 public final long interval = 1000; // 時間視窗ms public boolean grant() { long now = getNowTime(); if (now < timeStamp + interval) { // 在時間視窗內 reqCount++; // 判斷當前時間視窗內是否超過最大請求控制數 return reqCount <= limit; } else { timeStamp = now; // 超時後重置 reqCount = 1; return true; } } }
滑動視窗
滑動視窗,又稱rolling window。為了解決這個問題,我們引入了滑動視窗演算法。如果學過TCP網路協議的話,那麼一定對滑動視窗這個名詞不會陌生。下面這張圖,很好地解釋了滑動視窗演算法:
在上圖中,整個紅色的矩形框表示一個時間視窗,在我們的例子中,一個時間視窗就是一分鐘。然後我們將時間視窗進行劃分,比如圖中,我們就將滑動視窗 劃成了6格,所以每格代表的是10秒鐘。每過10秒鐘,我們的時間視窗就會往右滑動一格。每一個格子都有自己獨立的計數器counter,比如當一個請求 在0:35秒的時候到達,那麼0:30~0:39對應的counter就會加1。
那麼滑動視窗怎麼解決剛才的臨界問題的呢?我們可以看上圖,0:59到達的100個請求會落在灰色的格子中,而1:00到達的請求會落在橘黃色的格 子中。當時間到達1:00時,我們的視窗會往右移動一格,那麼此時時間視窗內的總請求數量一共是200個,超過了限定的100個,所以此時能夠檢測出來觸 發了限流。
我再來回顧一下剛才的計數器演算法,我們可以發現,計數器演算法其實就是滑動視窗演算法。只是它沒有對時間視窗做進一步地劃分,所以只有1格。
由此可見,當滑動視窗的格子劃分的越多,那麼滑動視窗的滾動就越平滑,限流的統計就會越精確。
public class LeakyDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 水漏出的速度
public int water; // 當前水量(當前累積請求數)
public boolean grant() {
long now = getNowTime();
water = max(0, water - (now - timeStamp) * rate); // 先執行漏水,計算剩餘水量
timeStamp = now;
if ((water + 1) < capacity) {
// 嘗試加水,並且水還未滿
water += 1;
return true;
}
else {
// 水滿,拒絕加水
return false;
}
}
}
先看看漏桶演算法(Leaky bucket)
如圖所示,很明顯從原來兩個流量(12mbps 和2mbps)限流成了 3mbps.
實現:
一個比較簡單實現是: n 個執行緒這種先把資料流量放置到一個佇列裡(或者 一個介面拆成1個佇列,分而治之), 然後另外一個執行緒從裡面獲取資料,請求.
應用場景:
非同步化的呼叫比較好, 同步化的呼叫的話,就需要搞成類似 reactor 模式的形式,每個資料包需要有 seq_no 的概念(tcp,dubbo 非同步傳輸).
再看看令牌桶(Token bucket):
Guava官方文件-RateLimiter類
public class TokenBucketDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 令牌放入速度
public int tokens; // 當前令牌數量
public boolean grant() {
long now = getNowTime();
// 先新增令牌
tokens = min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1個令牌,則拒絕
return false;
}
else {
// 還有令牌,領取令牌
tokens -= 1;
return true;
}
}
}
使用Guava的RateLimiter進行限流控制
Guava是google提供的java擴充套件類庫,其中的限流工具類RateLimiter採用的就是令牌桶演算法。RateLimiter 從概念上來講,速率限制器會在可配置的速率下分配許可證,如果必要的話,每個acquire() 會阻塞當前執行緒直到許可證可用後獲取該許可證,一旦獲取到許可證,不需要再釋放許可證。通俗的講RateLimiter會按照一定的頻率往桶裡扔令牌,執行緒拿到令牌才能執行,比如你希望自己的應用程式QPS不要超過1000,那麼RateLimiter設定1000的速率後,就會每秒往桶裡扔1000個令牌。例如我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個,此時可以採用如下方式:
有一點很重要,那就是請求的許可數從來不會影響到請求本身的限制(呼叫acquire(1) 和呼叫acquire(1000) 將得到相同的限制效果,如果存在這樣的呼叫的話),但會影響下一次請求的限制,也就是說,如果一個高開銷的任務抵達一個空閒的RateLimiter,它會被馬上許可,但是下一個請求會經歷額外的限制,從而來償付高開銷任務。注意:RateLimiter 並不提供公平性的保證。
- publicclass <span style="font-size:14px;">RateLimiter</span>{
- publicdouble acquire() {
- return acquire(1);
- }
- publicdouble acquire(int permits) {
- checkPermits(permits); //檢查引數是否合法(是否大於0)
- long microsToWait;
- synchronized (mutex) { //應對併發情況需要同步
- microsToWait = reserveNextTicket(permits, readSafeMicros()); //獲得需要等待的時間
- }
- ticker.sleepMicrosUninterruptibly(microsToWait); //等待,當未達到限制時,microsToWait為0
- return1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
- }
- privatelong reserveNextTicket(double requiredPermits, long nowMicros) {
- resync(nowMicros); //補充令牌
- long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
- double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //獲取這次請求消耗的令牌數目
- double freshPermits = requiredPermits - storedPermitsToSpend;
- long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
- + (long) (freshPermits * stableIntervalMicros);
- this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
- this.storedPermits -= storedPermitsToSpend; // 減去消耗的令牌
- return microsToNextFreeTicket;
- }
- privatevoid resync(long nowMicros) {
- // if nextFreeTicket is in the past, resync to now
- if (nowMicros > nextFreeTicketMicros) {
- storedPermits = Math.min(maxPermits,
- storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
- nextFreeTicketMicros = nowMicros;
- }
- }
- }
四、使用Semphore進行併發流控
Java 併發庫的Semaphore 可以很輕鬆完成訊號量控制,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”,這可應用於死鎖恢復的一些場合。下面的Demo中申明瞭一個只有5個許可的Semaphore,而有20個執行緒要訪問這個資源,通過acquire()和release()獲取和釋放訪問許可:
最後:進行限流控制還可以有很多種方法,針對不同的場景各有優劣,例如通過AtomicLong計數器控制、使用MQ訊息佇列進行流量消峰等等。