令牌桶演算法限流
限流
限流是對某一時間視窗內的請求數進行限制,保持系統的可用性和穩定性,防止因流量暴增而導致的系統執行緩慢或宕機。常用的限流演算法有令牌桶和和漏桶,而Google開源專案Guava中的RateLimiter使用的就是令牌桶控制演算法。
在開發高併發系統時有三把利器用來保護系統:快取、降級和限流
- 快取:快取的目的是提升系統訪問速度和增大系統處理容量
- 降級:降級是當伺服器壓力劇增的情況下,根據當前業務情況及流量對一些服務和頁面有策略的降級,以此釋放伺服器資源以保證核心任務的正常執行
- 限流:限流的目的是通過對併發訪問/請求進行限速,或者對一個時間視窗內的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務、排隊或等待、降級等處理
我們經常在調別人的介面的時候會發現有限制,比如微信公眾平臺介面、百度API Store、聚合API等等這樣的,對方會限制每天最多調多少次或者每分鐘最多調多少次
我們自己在開發系統的時候也需要考慮到這些,比如我們公司在上傳商品的時候就做了限流,因為使用者每一次上傳商品,我們需要將商品資料同到到美團、餓了麼、京東、百度、自營等第三方平臺,這個工作量是巨大,頻繁操作會拖慢系統,故做限流。
以上都是題外話,接下來我們重點看一下令牌桶演算法
令牌桶演算法
下面是從網上找的兩張圖來描述令牌桶演算法:
RateLimiter
https://github.com/google/guava
RateLimiter的程式碼不長,註釋加程式碼432行,看一下RateLimiter怎麼用
1 package com.cjs.example; 2 3 import com.google.common.util.concurrent.RateLimiter; 4 import org.springframework.web.bind.annotation.RequestMapping; 5 import org.springframework.web.bind.annotation.RestController; 6 7 import java.text.SimpleDateFormat; 8 import java.util.Date; 9 10 @RestController11 public class HelloController { 12 13 private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 14 15 private static final RateLimiter rateLimiter = RateLimiter.create(2); 16 17 /** 18 * tryAcquire嘗試獲取permit,預設超時時間是0,意思是拿不到就立即返回false 19 */ 20 @RequestMapping("/sayHello") 21 public String sayHello() { 22 if (rateLimiter.tryAcquire()) { // 一次拿1個 23 System.out.println(sdf.format(new Date())); 24 try { 25 Thread.sleep(500); 26 } catch (InterruptedException e) { 27 e.printStackTrace(); 28 } 29 }else { 30 System.out.println("limit"); 31 } 32 return "hello"; 33 } 34 35 /** 36 * acquire拿不到就等待,拿到為止 37 */ 38 @RequestMapping("/sayHi") 39 public String sayHi() { 40 rateLimiter.acquire(5); // 一次拿5個 41 System.out.println(sdf.format(new Date())); 42 return "hi"; 43 } 44 45 }
關於RateLimiter:
- A rate limiter。每個acquire()方法如果必要的話會阻塞直到一個permit可用,然後消費它。獲得permit以後不需要釋放。
- RateLimiter在併發環境下使用是安全的:它將限制所有執行緒呼叫的總速率。注意,它不保證公平呼叫。
- RateLimiter在併發環境下使用是安全的:它將限制所有執行緒呼叫的總速率。注意,它不保證公平呼叫。Rate limiter(直譯為:速度限制器)經常被用來限制一些物理或者邏輯資源的訪問速率。這和java.util.concurrent.Semaphore正好形成對照。
- 一個RateLimiter主要定義了發放permits的速率。如果沒有額外的配置,permits將以固定的速度分配,單位是每秒多少permits。預設情況下,Permits將會被穩定的平緩的發放。
- 可以配置一個RateLimiter有一個預熱期,在此期間permits的發放速度每秒穩步增長直到到達穩定的速率
基本用法:
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second" void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
實現
SmoothBursty以穩定的速度生成permit
SmoothWarmingUp是漸進式的生成,最終達到最大值趨於穩定
原始碼片段解讀:
public abstract class RateLimiter {</span><span style="color: #008000;">/**</span><span style="color: #008000;"> * 用給定的吞吐量(“permits per second”)建立一個RateLimiter。 * 通常是QPS </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> RateLimiter create(<span style="color: #0000ff;">double</span><span style="color: #000000;"> permitsPerSecond) { </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } </span><span style="color: #0000ff;">static</span> RateLimiter create(<span style="color: #0000ff;">double</span><span style="color: #000000;"> permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter </span>= <span style="color: #0000ff;">new</span> SmoothBursty(stopwatch, 1.0 <span style="color: #008000;">/*</span><span style="color: #008000;"> maxBurstSeconds </span><span style="color: #008000;">*/</span><span style="color: #000000;">); rateLimiter.setRate(permitsPerSecond); </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> rateLimiter; } </span><span style="color: #008000;">/**</span><span style="color: #008000;"> * 用給定的吞吐量(QPS)和一個預熱期建立一個RateLimiter </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> RateLimiter create(<span style="color: #0000ff;">double</span> permitsPerSecond, <span style="color: #0000ff;">long</span><span style="color: #000000;"> warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod </span>>= 0, "warmupPeriod must not be negative: %s"<span style="color: #000000;">, warmupPeriod); </span><span style="color: #0000ff;">return</span> create(permitsPerSecond, warmupPeriod, unit, 3.0<span style="color: #000000;">, SleepingStopwatch.createFromSystemTimer()); } </span><span style="color: #0000ff;">static</span><span style="color: #000000;"> RateLimiter create( </span><span style="color: #0000ff;">double</span><span style="color: #000000;"> permitsPerSecond, </span><span style="color: #0000ff;">long</span><span style="color: #000000;"> warmupPeriod, TimeUnit unit, </span><span style="color: #0000ff;">double</span><span style="color: #000000;"> coldFactor, SleepingStopwatch stopwatch) { RateLimiter rateLimiter </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); rateLimiter.setRate(permitsPerSecond); </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> rateLimiter; } </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">final</span><span style="color: #000000;"> SleepingStopwatch stopwatch; </span><span style="color: #008000;">//</span><span style="color: #008000;"> 鎖</span> <span style="color: #0000ff;">private</span> <span style="color: #0000ff;">volatile</span><span style="color: #000000;"> Object mutexDoNotUseDirectly; </span><span style="color: #0000ff;">private</span><span style="color: #000000;"> Object mutex() { Object mutex </span>=<span style="color: #000000;"> mutexDoNotUseDirectly; </span><span style="color: #0000ff;">if</span> (mutex == <span style="color: #0000ff;">null</span><span style="color: #000000;">) { </span><span style="color: #0000ff;">synchronized</span> (<span style="color: #0000ff;">this</span><span style="color: #000000;">) { mutex </span>=<span style="color: #000000;"> mutexDoNotUseDirectly; </span><span style="color: #0000ff;">if</span> (mutex == <span style="color: #0000ff;">null</span><span style="color: #000000;">) { mutexDoNotUseDirectly </span>= mutex = <span style="color: #0000ff;">new</span><span style="color: #000000;"> Object(); } } } </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> mutex; } </span><span style="color: #008000;">/**</span><span style="color: #008000;"> * 從RateLimiter中獲取一個permit,阻塞直到請求可以獲得為止 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> 休眠的時間,單位是秒,如果沒有被限制則是0.0 </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">double</span><span style="color: #000000;"> acquire() { </span><span style="color: #0000ff;">return</span> acquire(1<span style="color: #000000;">); } </span><span style="color: #008000;">/**</span><span style="color: #008000;"> * 從RateLimiter中獲取指定數量的permits,阻塞直到請求可以獲得為止 </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">double</span> acquire(<span style="color: #0000ff;">int</span><span style="color: #000000;"> permits) { </span><span style="color: #0000ff;">long</span> microsToWait =<span style="color: #000000;"> reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); </span><span style="color: #0000ff;">return</span> 1.0 * microsToWait / SECONDS.toMicros(1L<span style="color: #000000;">); } </span><span style="color: #008000;">/**</span><span style="color: #008000;"> * 預定給定數量的permits以備將來使用 * 直到這些預定數量的permits可以被消費則返回逝去的微秒數 </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">final</span> <span style="color: #0000ff;">long</span> reserve(<span style="color: #0000ff;">int</span><span style="color: #000000;"> permits) { checkPermits(permits); </span><span style="color: #0000ff;">synchronized</span><span style="color: #000000;"> (mutex()) { </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> checkPermits(<span style="color: #0000ff;">int</span><span style="color: #000000;"> permits) { checkArgument(permits </span>> 0, "Requested permits (%s) must be positive"<span style="color: #000000;">, permits); } </span><span style="color: #0000ff;">final</span> <span style="color: #0000ff;">long</span> reserveAndGetWaitLength(<span style="color: #0000ff;">int</span> permits, <span style="color: #0000ff;">long</span><span style="color: #000000;"> nowMicros) { </span><span style="color: #0000ff;">long</span> momentAvailable =<span style="color: #000000;"> reserveEarliestAvailable(permits, nowMicros); </span><span style="color: #0000ff;">return</span> max(momentAvailable - nowMicros, 0<span style="color: #000000;">); }
}
abstract class SmoothRateLimiter extends RateLimiter {
</span><span style="color: #008000;">/**</span><span style="color: #008000;"> The currently stored permits. </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">double</span><span style="color: #000000;"> storedPermits; </span><span style="color: #008000;">/**</span><span style="color: #008000;"> The maximum number of stored permits. </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">double</span><span style="color: #000000;"> maxPermits; </span><span style="color: #008000;">/**</span><span style="color: #008000;"> * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">double</span><span style="color: #000000;"> stableIntervalMicros; </span><span style="color: #008000;">/**</span><span style="color: #008000;"> * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">private</span> <span style="color: #0000ff;">long</span> nextFreeTicketMicros = 0L; <span style="color: #008000;">//</span><span style="color: #008000;"> could be either in the past or future</span> <span style="color: #0000ff;">final</span> <span style="color: #0000ff;">long</span> reserveEarliestAvailable(<span style="color: #0000ff;">int</span> requiredPermits, <span style="color: #0000ff;">long</span><span style="color: #000000;"> nowMicros) { resync(nowMicros); </span><span style="color: #0000ff;">long</span> returnValue =<span style="color: #000000;"> nextFreeTicketMicros; </span><span style="color: #0000ff;">double</span> storedPermitsToSpend = min(requiredPermits, <span style="color: #0000ff;">this</span>.storedPermits); <span style="color: #008000;">//</span><span style="color: #008000;"> 本次可以獲取到的permit數量</span> <span style="color: #0000ff;">double</span> freshPermits = requiredPermits - storedPermitsToSpend; <span style="color: #008000;">//</span><span style="color: #008000;"> 差值,如果儲存的permit大於本次需要的permit數量則此處是0,否則是一個正數</span> <span style="color: #0000ff;">long</span> waitMicros =<span style="color: #000000;"> storedPermitsToWaitTime(</span><span style="color: #0000ff;">this</span><span style="color: #000000;">.storedPermits, storedPermitsToSpend) </span>+ (<span style="color: #0000ff;">long</span>) (freshPermits * stableIntervalMicros); <span style="color: #008000;">//</span><span style="color: #008000;"> 計算需要等待的時間(微秒)</span> <span style="color: #0000ff;">this</span>.nextFreeTicketMicros =<span style="color: #000000;"> LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); </span><span style="color: #0000ff;">this</span>.storedPermits -= storedPermitsToSpend; <span style="color: #008000;">//</span><span style="color: #008000;"> 減去本次消費的permit數</span> <span style="color: #0000ff;">return</span><span style="color: #000000;"> returnValue; } </span><span style="color: #0000ff;">void</span> resync(<span style="color: #0000ff;">long</span><span style="color: #000000;"> nowMicros) { </span><span style="color: #008000;">//</span><span style="color: #008000;"> if nextFreeTicket is in the past, resync to now</span> <span style="color: #0000ff;">if</span> (nowMicros > nextFreeTicketMicros) { <span style="color: #008000;">//</span><span style="color: #008000;"> 表示當前可以獲得permit</span> <span style="color: #0000ff;">double</span> newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); <span style="color: #008000;">//</span><span style="color: #008000;"> 計算這段時間可以生成多少個permit</span> storedPermits = min(maxPermits, storedPermits + newPermits); <span style="color: #008000;">//</span><span style="color: #008000;"> 如果超過maxPermit,則取maxPermit,否則取儲存的permit+新生成的permit</span> nextFreeTicketMicros = nowMicros; <span style="color: #008000;">//</span><span style="color: #008000;"> 設定下一次可以獲得permit的時間點為當前時間</span>
}
}
}
RateLimiter實現的令牌桶演算法,不僅可以應對正常流量的限速,而且可以處理突發暴增的請求,實現平滑限流。
通過程式碼,我們可以看到它可以預消費,怎麼講呢
nextFreeTicketMicros表示下一次請求獲得permits的最早時間。每次授權一個請求以後,這個值會向後推移(PS:想象一下時間軸)即向未來推移。因此,大的請求會比小的請求推得更。這裡的大小指的是獲取permit的數量。這個應該很好理解,因為上一次請求獲取的permit數越多,那麼下一次再獲取授權時更待的時候會更長,反之,如果上一次獲取的少,那麼時間向後推移的就少,下一次獲得許可的時間更短。可見,都是有代價的。正所謂:要浪漫就要付出代價。
還要注意到一點,就是獲取令牌和處理請求是兩個動作,而且,並不是每一次都獲取一個,也不要想當然的認為一個請求獲取一個permit(或者叫令牌),可以再看看前面那幅圖
Stopwatch
一個以納秒為單位度量流逝時間的物件。它是一個相對時間,而不是絕對時間。
Stopwatch stopwatch = Stopwatch.createStarted(); System.out.println("hahah"); stopwatch.stop(); Duration duration = stopwatch.elapsed(); System.out.println(stopwatch);
Semaphore(訊號量)
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
一個訊號量維護了一系列permits。
每次呼叫acquire()方法獲取permit,如果必要的話會阻塞直到有一個permit可用為止。
呼叫release()方法則會釋放自己持有的permit,即用完了再還回去。
訊號量限制的是併發訪問臨界資源的執行緒數。
令牌桶演算法 VS 漏桶演算法
漏桶
漏桶的出水速度是恆定的,那麼意味著如果瞬時大流量的話,將有大部分請求被丟棄掉(也就是所謂的溢位)。
令牌桶
生成令牌的速度是恆定的,而請求去拿令牌是沒有速度限制的。這意味,面對瞬時大流量,該演算法可以在短時間內請求拿到大量令牌,而且拿令牌的過程並不是消耗很大的事情。
最後,不論是對於令牌桶拿不到令牌被拒絕,還是漏桶的水滿了溢位,都是為了保證大部分流量的正常使用,而犧牲掉了少部分流量,這是合理的,如果因為極少部分流量需要保證的話,那麼就可能導致系統達到極限而掛掉,得不償失。
小定律:排隊理論
https://en.wikipedia.org/wiki/Little%27s_law
the long-term average number L of customers in a stationary system is equal to the long-term average effective arrival rate λ multiplied by the average time W that a customer spends in the system. Expressed algebraically the law is:
在一個固定系統中,顧客的長期平均數量L等於顧客的長期平均到達速率λ乘以顧客在系統中平均花費的時間W。用公式表示為:
雖然這看起來很容易,但這是一個非常顯著的舉世矚目的結果,因為這種關係“不受到達過程的分佈,服務分佈,服務順序,或其他任何因素的影響”。這個結果適用於任何系統,特別是適用於系統內的系統。唯一的要求是系統必須是穩定的非搶佔式的。
例子
例1:找響應時間
假設有一個應用程式沒有簡單的方法來度量響應時間。如果系統的平均數量和吞吐量是已知的,那麼平均響應時間就是:
mean response time = mean number in system / mean throughput
平均響應時間 = 系統的平均數量 / 平均吞吐量.
例2:顧客在店裡
想象一下,一家小商店只有一個櫃檯和一個可供瀏覽的區域,每次只能有一個人在櫃檯,並且沒有人不買東西就離開。
所以這個系統大致是:進入 --> 瀏覽 --> 櫃檯結賬 --> 離開
在一個穩定的系統中,人們進入商店的速度就是他們到達商店的速度(我們叫做到達速度),它們離開的速度叫做離開速度。
相比之下,到達速度超過離開速度代表是一個不穩定的系統,這就會造成等待的顧客數量將逐漸增加到無窮大。
前面的小定律告訴我們,商店的平均顧客數量L等於有效的到達速度λ乘以顧客在商店的平均停留時間W。用公式表示為:
假設,顧客以每小時10個的速度到達,並且平均停留時間是0.5小時。那麼這就意味著,任意時間商店的平均顧客數量是5
現在假設商店正在考慮做更多的廣告,把到達率提高到每小時20。商店必須準備好容納平均10人,或者必須將每個顧客在商店中的時間減少到0.25小時。商店可以通過更快地結帳或者增加更多的櫃檯來達到後者的目的。
我們可以把前面的小定律應用到商店系統中。例如,考慮櫃檯和在櫃檯前排的隊。假設平均有2個人在櫃檯前排隊,我們知道顧客到達速度是每小時10,所以顧客平均必須停留時間為0.2小時。
最後
這是單機(單程序)的限流,是JVM級別的的限流,所有的令牌生成都是在記憶體中,在分散式環境下不能直接這麼用。
如果我們能把permit放到Redis中就可以在分散式環境中用了。
參考
https://blog.csdn.net/jek123456/article/details/77152571
https://blog.csdn.net/syc001/article/details/72841951
https://segmentfault.com/a/1190000012875897
https://blog.csdn.net/charleslei/article/details/53152883
https://www.jianshu.com/p/8f548e469bbe
https://www.cnblogs.com/f-zhao/p/7210158.html
https://m.jb51.net/article/127996.htm
歡迎各位轉載,但必須在文章頁面中給出作者和原文連結!