基於redis的分散式RateLimiter(限流)實現
業務背景
系統需要對接某IM廠商rest介面,向客戶端推送訊息(以及其他IM業務)
該廠商對rest介面呼叫有頻率限制:總rest呼叫9000次/30s;訊息推送600次/30s
系統為分散式叢集,需要控制整個分散式叢集總的介面呼叫頻率滿足以上限制
Guava RateLimiter
上篇文章 《Guava RateLimiter原始碼解析》中介紹了Guava RateLimiter的用法及原理,但為什麼不直接使用Guava RateLimiter?原因有二:
- Guava RateLimiter只能應用於單程序,多程序間協同控制便無能為力
- Guava RateLimiter能夠處理突發請求(預消費),這裡rest介面呼叫頻率限制是固定的,不需要更不能使用預消費能力,否則將會導致介面呼叫失敗
Redis
為什麼說選用redis是合理的?
- redis效率高,易擴充套件
- redis對語言無關,可以更好的接入不同語言開發的系統(異構)
- redis單程序單執行緒的特點可以更好的解決最終一致性,多程序間協同控制更為容易
基於Redis實現RateLimiter
這裡完全參考Guava RateLimiter實現思路,不同的是,Guava將令牌桶資料存放於物件(記憶體)中,這裡講令牌桶資料存放在redis中,奉上原始碼 https://github.com/manerfan/m...
首先建立令牌桶資料模型
class RedisPermits(
val maxPermits: Long,
var storedPermits: Long,
val intervalMillis: Long,
var nextFreeTicketMillis: Long = System.currentTimeMillis()
) {
constructor(permitsPerSecond: Double, maxBurstSeconds: Int = 60, nextFreeTicketMillis: Long = System.currentTimeMillis()) :
this ((permitsPerSecond * maxBurstSeconds).toLong(), permitsPerSecond.toLong(), (TimeUnit.SECONDS.toMillis(1) / permitsPerSecond).toLong(), nextFreeTicketMillis)
fun expires(): Long {
val now = System.currentTimeMillis()
return 2 * TimeUnit.MINUTES.toSeconds(1) + TimeUnit.MILLISECONDS.toSeconds(max(nextFreeTicketMillis, now) - now)
}
fun reSync(now: Long): Boolean {
if (now > nextFreeTicketMillis) {
storedPermits = min(maxPermits, storedPermits + (now - nextFreeTicketMillis) / intervalMillis)
nextFreeTicketMillis = now
return true
}
return false
}
}
各屬性欄位含義與Guava相同(參見《Guava RateLimiter原始碼解析》),且預設最多儲存maxBurstSeconds秒生成的令牌
reSync
函式同樣是為了解決令牌桶資料更新問題,在每次獲取令牌之前呼叫,這裡不多介紹
expires
函式計算redis資料過期時間。同樣的例子,某介面需要分別對每個使用者做訪問頻率限制,假設系統中存在6W使用者,則至多需要在redis中建立6W條資料,對於長期執行的系統,這個數字會只增不減,這對於redis來說也是一個不小的挑戰(雖然示例中的數字相對較小)。為了減輕redis壓力,需要對令牌桶資料做過期處理,對於使用頻率不是很高的業務場景,可以及時清理。
為了更好的操作,這裡建立一個操作RedisPermits的Redis模板
@Configuration
class RateLimiterConfiguration {
@Bean
fun permitsTemplate(redisConnectionFactory: RedisConnectionFactory): PermitsTemplate {
val template = PermitsTemplate()
template.connectionFactory = redisConnectionFactory
return template
}
}
class PermitsTemplate : RedisTemplate<String, RedisPermits>() {
private val objectMapper = jacksonObjectMapper()
init {
keySerializer = StringRedisSerializer()
valueSerializer = object : RedisSerializer<RedisPermits> {
override fun serialize(t: RedisPermits) = objectMapper.writeValueAsBytes(t)
override fun deserialize(bytes: ByteArray?) = bytes?.let { objectMapper.readValue(it, RedisPermits::class.java) }
}
}
}
/**
* 生成並存儲預設令牌桶
*/
private fun putDefaultPermits(): RedisPermits {
val permits = RedisPermits(permitsPerSecond, maxBurstSeconds)
permitsTemplate.opsForValue().set(key, permits, permits.expires(), TimeUnit.SECONDS)
return permits
}
/**
* 獲取/更新令牌桶
*/
private var permits: RedisPermits
get() = permitsTemplate.opsForValue()[key] ?: putDefaultPermits()
set(permits) = permitsTemplate.opsForValue().set(key, permits, permits.expires(), TimeUnit.SECONDS)
/**
* 獲取redis伺服器時間
*/
private val now get() = permitsTemplate.execute { it.time() } ?: System.currentTimeMillis()
putDefaultPermits
用於生成預設令牌桶並存入redis
permits
的getter
setter
方法實現了redis中令牌桶的獲取及更新
now
用於獲取redis伺服器的時間,這樣便能保證分散式叢集中各節點對資料處理的一致性
private fun reserveAndGetWaitLength(tokens: Long): Long {
val n = now
var permit = permits
permit.reSync(n)
val storedPermitsToSpend = min(tokens, permit.storedPermits) // 可以消耗的令牌數
val freshPermits = tokens - storedPermitsToSpend // 需要等待的令牌數
val waitMillis = freshPermits * permit.intervalMillis // 需要等待的時間
permit.nextFreeTicketMillis = LongMath.saturatedAdd(permit.nextFreeTicketMillis, waitMillis)
permit.storedPermits -= storedPermitsToSpend
permits = permit
return permit.nextFreeTicketMillis - n
}
該函式用於獲取tokens個令牌,並返回需要等待到的時長(毫秒)
其中,storedPermitsToSpend為桶中可以消費的令牌數,freshPermits為還需要的(需要補充的)令牌數,根據該值計算需要等待的時間,追加並更新到nextFreeTicketMillis
需要注意,這裡與Guava RateLimiter不同的是,Guava中的返回是更新前的(上次請求計算的)nextFreeTicketMicros,本次請求通過為上次請求的預消費行為埋單而實現突發請求的處理;這裡返回的是由於桶中令牌不足而需要真真切切等待的時間
通俗來講
- Guava為寅吃卯糧,本次請求需要為上次請求的預消費行為埋單
- 這裡為自力更生,誰消費誰埋單,為自己的行為負責
private fun reserve(tokens: Long): Long {
checkTokens(tokens)
try {
syncLock.lock()
return reserveAndGetWaitLength(tokens)
} finally {
syncLock.unLock()
}
}
該函式與reserveAndGetWaitLength
等同,只是為了避免併發問題而添加了同步鎖(分散式同步鎖的介紹請參見《基於redis的分散式鎖實現》)
private fun queryEarliestAvailable(tokens: Long): Long {
val n = now
var permit = permits
permit.reSync(n)
val storedPermitsToSpend = min(tokens, permit.storedPermits) // 可以消耗的令牌數
val freshPermits = tokens - storedPermitsToSpend // 需要等待的令牌數
val waitMillis = freshPermits * permit.intervalMillis // 需要等待的時間
return LongMath.saturatedAdd(permit.nextFreeTicketMillis - n, waitMillis)
}
該函式用於計算,獲取tokens個令牌需要等待的時長(毫秒)
private fun canAcquire(tokens: Long, timeoutMillis: Long): Boolean {
return queryEarliestAvailable(tokens) - timeoutMillis <= 0
}
該函式用於計算,timeoutMillis時間內是否可以獲取tokens個令牌
通過以上幾個函式的瞭解,我們便可以很輕鬆的實現Guava RateLimiter中的acquire
及tryAcquire
功能
fun acquire(tokens: Long): Long {
var milliToWait = reserve(tokens)
logger.info("acquire for {}ms {}", milliToWait, Thread.currentThread().name)
Thread.sleep(milliToWait)
return milliToWait
}
fun acquire() = acquire(1)
fun tryAcquire(tokens: Long, timeout: Long, unit: TimeUnit): Boolean {
val timeoutMicros = max(unit.toMillis(timeout), 0)
checkTokens(tokens)
var milliToWait: Long
try {
syncLock.lock()
if (!canAcquire(tokens, timeoutMicros)) {
return false
} else {
milliToWait = reserveAndGetWaitLength(tokens)
}
} finally {
syncLock.unLock()
}
Thread.sleep(milliToWait)
return true
}
fun tryAcquire(timeout: Long, unit: TimeUnit) = tryAcquire(1, timeout, unit)
回顧問題
至此,基於redis的分散式RateLimiter(限流)控制功能便完成了
回到文件起始處提出的問題,接某IM廠商rest介面,我們可以針對不同的頻率限制建立不同的RateLimiter
val restRateLimiter = rateLimiterFactory.build("ratelimiter:im:rest", 9000 /30, 30)
val msgRateLimiter = rateLimiterFactory.build("ratelimiter:im:msg", 600 /30, 30)
推送訊息時,可以如下呼叫
restRateLimiter.acquire()
msgRateLimiter.acquire(msgs.size)
msgUtil.push(msgs)
對於介面提供方限制介面訪問頻次,可以如下實現
val msgRateLimiter = rateLimiterFactory.build("ratelimiter:im:msg", 600 /30, 30)
fun receiveMsg(msgs: Array<Message>): Boolean {
return when(msgRateLimiter.tryAcquire(msgs.size, 2, TimeUnit.SECONDS)) {
true -> {
thread(true) { msgUtil.receive(msgs) }
true
}
else -> false
}