1. 程式人生 > >一個輕量級的基於RateLimiter的分散式限流實現

一個輕量級的基於RateLimiter的分散式限流實現

上篇文章([限流演算法與Guava RateLimiter解析](http://blog.jboost.cn/ratelimiter.html))對常用的限流演算法及Google Guava基於令牌桶演算法的實現RateLimiter進行了介紹。RateLimiter通過執行緒鎖控制同步,只適用於單機應用,在分散式環境下,雖然有像阿里Sentinel的限流開源框架,但對於一些小型應用來說未免過重,但限流的需求在小型專案中也是存在的,比如獲取手機驗證碼的控制,對資源消耗較大操作的訪問頻率控制等。本文介紹最近寫的一個基於RateLimiter,適用於分散式環境下的限流實現,並使用spring-boot-starter的形式釋出,比較輕量級且“開箱即用”。 本文限流實現包括兩種形式: 1. 基於RateLimiter令牌桶演算法的限速控制(嚴格限制訪問速度) 2. 基於Lua指令碼的限量控制(限制一個時間視窗內的訪問量,對訪問速度沒有嚴格限制) ## 限速控制 ### 1. 令牌桶模型 首先定義令牌桶模型,與RateLimiter中類似,包括幾個關鍵屬性與關鍵方法。其中關鍵屬性定義如下, ```java @Data public class RedisPermits { /** * 最大儲存令牌數 */ private double maxPermits; /** * 當前儲存令牌數 */ private double storedPermits; /** * 新增令牌的時間間隔/毫秒 */ private double intervalMillis; /** * 下次請求可以獲取令牌的時間,可以是過去(令牌積累)也可以是將來的時間(令牌預消費) */ private long nextFreeTicketMillis; //... ``` 關鍵方法定義與RateLimiter也大同小異,方法註釋基本已描述各方法用途,不再贅述。 ```java /** * 構建Redis令牌資料模型 * * @param permitsPerSecond 每秒放入的令牌數 * @param maxBurstSeconds maxPermits由此欄位計算,最大儲存maxBurstSeconds秒生成的令牌 * @param nextFreeTicketMillis 下次請求可以獲取令牌的起始時間,預設當前系統時間 */ public RedisPermits(double permitsPerSecond, double maxBurstSeconds, Long nextFreeTicketMillis) { this.maxPermits = permitsPerSecond * maxBurstSeconds; this.storedPermits = maxPermits; this.intervalMillis = TimeUnit.SECONDS.toMillis(1) / permitsPerSecond; this.nextFreeTicketMillis = nextFreeTicketMillis; } /** * 基於當前時間,若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新資料 */ public void resync(long nowMillis) { if (nowMillis > nextFreeTicketMillis) { double newPermits = (nowMillis - nextFreeTicketMillis) / intervalMillis; storedPermits = Math.min(maxPermits, storedPermits + newPermits); nextFreeTicketMillis = nowMillis; } } /** * 保留指定數量令牌,並返回需要等待的時間 */ public long reserveAndGetWaitLength(long nowMillis, int permits) { resync(nowMillis); double storedPermitsToSpend = Math.min(permits, storedPermits); // 可以消耗的令牌數 double freshPermits = permits - storedPermitsToSpend; // 需要等待的令牌數 long waitMillis = (long) (freshPermits * intervalMillis); // 需要等待的時間 nextFreeTicketMillis = LongMath.saturatedAdd(nextFreeTicketMillis, waitMillis); storedPermits -= storedPermitsToSpend; return waitMillis; } /** * 在超時時間內,是否有指定數量的令牌可用 */ public boolean canAcquire(long nowMillis, int permits, long timeoutMillis) { return queryEarliestAvailable(nowMillis, permits) <= timeoutMillis; } /** * 指定數量令牌數可用需等待的時間 * * @param permits 需保留的令牌數 * @return 指定數量令牌可用的等待時間,如果為0或負數,表示當前可用 */ private long queryEarliestAvailable(long nowMillis, int permits) { resync(nowMillis); double storedPermitsToSpend = Math.min(permits, storedPermits); // 可以消耗的令牌數 double freshPermits = permits - storedPermitsToSpend; // 需要等待的令牌數 long waitMillis = (long) (freshPermits * intervalMillis); // 需要等待的時間 return LongMath.saturatedAdd(nextFreeTicketMillis - nowMillis, waitMillis); } ``` ### 2. 令牌桶控制類 Guava RateLimiter中的控制都在RateLimiter及其子類中(如SmoothBursty),本處涉及到分散式環境下的同步,因此將其解耦,令牌桶模型儲存於Redis中,對其同步操作的控制放置在如下控制類,其中同步控制使用到了前面介紹的分散式鎖(參考[基於Redis分散式鎖的正確開啟方式](http://blog.jboost.cn/distributedlock.html)) ```java @Slf4j public class RedisRateLimiter { /** * 獲取一個令牌,阻塞一直到獲取令牌,返回阻塞等待時間 * * @return time 阻塞等待時間/毫秒 */ public long acquire(String key) throws IllegalArgumentException { return acquire(key, 1); } /** * 獲取指定數量的令牌,如果令牌數不夠,則一直阻塞,返回阻塞等待的時間 * * @param permits 需要獲取的令牌數 * @return time 等待的時間/毫秒 * @throws IllegalArgumentException tokens值不能為負數或零 */ public long acquire(String key, int permits) throws IllegalArgumentException { long millisToWait = reserve(key, permits); log.info("acquire {} permits for key[{}], waiting for {}ms", permits, key, millisToWait); try { Thread.sleep(millisToWait); } catch (InterruptedException e) { log.error("Interrupted when trying to acquire {} permits for key[{}]", permits, key, e); } return millisToWait; } /** * 在指定時間內獲取一個令牌,如果獲取不到則一直阻塞,直到超時 * * @param timeout 最大等待時間(超時時間),為0則不等待立即返回 * @param unit 時間單元 * @return 獲取到令牌則true,否則false * @throws IllegalArgumentException */ public boolean tryAcquire(String key, long timeout, TimeUnit unit) throws IllegalArgumentException { return tryAcquire(key, 1, timeout, unit); } /** * 在指定時間內獲取指定數量的令牌,如果在指定時間內獲取不到指定數量的令牌,則直接返回false, * 否則阻塞直到能獲取到指定數量的令牌 * * @param permits 需要獲取的令牌數 * @param timeout 最大等待時間(超時時間) * @param unit 時間單元 * @return 如果在指定時間內能獲取到指定令牌數,則true,否則false * @throws IllegalArgumentException tokens為負數或零,丟擲異常 */ public boolean tryAcquire(String key, int permits, long timeout, TimeUnit unit) throws IllegalArgumentException { long timeoutMillis = Math.max(unit.toMillis(timeout), 0); checkPermits(permits); long millisToWait; boolean locked = false; try { locked = lock.lock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId(), 60, 2, TimeUnit.SECONDS); if (locked) { long nowMillis = getNowMillis(); RedisPermits permit = getPermits(key, nowMillis); if (!permit.canAcquire(nowMillis, permits, timeoutMillis)) { return false; } else { millisToWait = permit.reserveAndGetWaitLength(nowMillis, permits); permitsRedisTemplate.opsForValue().set(key, permit, expire, TimeUnit.SECONDS); } } else { return false; //超時獲取不到鎖,也返回false } } finally { if (locked) { lock.unLock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId()); } } if (millisToWait > 0) { try { Thread.sleep(millisToWait); } catch (InterruptedException e) { } } return true; } /** * 保留指定的令牌數待用 * * @param permits 需保留的令牌數 * @return time 令牌可用的等待時間 * @throws IllegalArgumentException tokens不能為負數或零 */ private long reserve(String key, int permits) throws IllegalArgumentException { checkPermits(permits); try { lock.lock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId(), 60, 2, TimeUnit.SECONDS); long nowMillis = getNowMillis(); RedisPermits permit = getPermits(key, nowMillis); long waitMillis = permit.reserveAndGetWaitLength(nowMillis, permits); permitsRedisTemplate.opsForValue().set(key, permit, expire, TimeUnit.SECONDS); return waitMillis; } finally { lock.unLock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId()); } } /** * 獲取令牌桶 * * @return */ private RedisPermits getPermits(String key, long nowMillis) { RedisPermits permit = permitsRedisTemplate.opsForValue().get(key); if (permit == null) { permit = new RedisPermits(permitsPerSecond, maxBurstSeconds, nowMillis); } return permit; } /** * 獲取redis伺服器時間 */ private long getNowMillis() { String luaScript = "return redis.call('time')"; DefaultRedisScript