再寫一篇tps限流
阿新 • • 發佈:2019-04-02
shm request lua 長度 不同的 timeout als 會有 通用代碼
再寫一篇tps限流
各種限流算法的稱呼
網上有很多文章介紹限流算法,但是對於這些算法的稱呼與描述也是有點難以理解。不管那麽多了。我先按我理解的維度梳理一下。
主要維度是:是正向計數還是反向計數。是定點(時間點)重置當前計數器還是每次接口調用時按量調整當前還剩的可用請求數。
通俗理解
正向計數且定點(時間點)重置的流程
+-------------------------+ | init value = 0 | +-----------+-------------+ | | | +---------------------v----------------------+ | when request arrived | +------+ detect lastReqTime+interval > currentTime +------+ | | | | Y +--------------------------------------------+ | | N | | +-----v-----------------------+ +---------------v----------+ | reset value = 0 | | | +-----------------------------+ | detect value > thresold | | +-----+ +----+ | | +--------------------------+ | | N | | | | | v | | +---------+---------+ | | | | Y | | value = value + 1 | | | | | | | +--------+----------+ | | | | | +-------v---------+ +-----------v-----+ +------------------->+ return true | | return false | +-----------------+ +-----------------+
幾個參數解釋下:
- value:當前時間段有多少請求進來了,即計數器的值
- interval: 每次刷新計數器的時間間隔
- lastReqTime: 上次請求進來的時間點
- currentTime:當前時間點
- thresold:時間間隔(interval)內請求數的最大閾值
這樣設計,如果你的interval設置成1秒鐘,thresold設置成1000,那麽意思就是每秒限制1000請求數的流控。 相當於tps=1000限流。
反向計數
反向計數計數初始化value時不是初始化成0,而是初始化成thresold(你的限制請求數量的閾值)。
然後每次請求進來的時候 value不是+1而是-1,reset value的時候也是重置成thresold。
定點重置的lua腳本
-- 資源唯一標識 local key = KEYS[1] -- 時間窗口內最大並發數 local max_permits = tonumber(KEYS[2]) -- 窗口的間隔時間 local interval_milliseconds = tonumber(KEYS[3]) -- 獲取的並發數 local permits = tonumber(ARGV[1]) local current_permits = tonumber(redis.call("get", key) or 0) -- 如果超過了最大並發數,返回false if (current_permits + permits > max_permits) then return false else -- 增加並發計數 redis.call("incrby", key, permits) -- 如果key中保存的並發計數為0,說明當前是一個新的時間窗口,它的過期時間設置為窗口的過期時間 if (current_permits == 0) then redis.call("pexpire", key, interval_milliseconds) end return true end
定點重置存在的問題
不論是正向計數還是反向計數,定點重置都存在一個問題:
0r 1000r 1000r 0r
0s 0.8s 1.0s 1.2 2.0s
+----------------+-----+-----+----------------+------->timeline
^ ^ ^ ^ ^
| | | | |
| | | | |
| | | | |
| | | | |
+ + + + +
假設按上圖時間線描述,0-0.8s系統沒有收到請求,0.8-1.0s系統收到了1000個請求,1.0-1.2s系統又收到了1000個請求,1.2-2.0s系統收到了0個請求。這種場景其實是能通過上面的定點重置的流控的,但是實際在0.8s-1.2s這0.4s時間內tps達到了2000/(1.2-0.8)=5000的量,沒能達到真正意義上的tps限流的述求。
每次接口調用時按量調整當前還剩的可用請求數且反向計數
這裏先解釋一下按量,按量就是:
假設上一次接口調用到這次接口調用間隔是2s,然後我們是1000tps限流,那麽此時按量調整就是2sx1000r/s= +2000r。 也就是此時可用請求數按量加2000個。
整體邏輯比較復雜,先用java代碼描述下:
-- key
String key = "流控實例id串"; // 通用代碼,可以支持多個流控實例,控制不同的服務
-- 最大存儲的令牌數
int max_permits = 10000;
-- 每秒鐘產生的令牌數
int permits_per_second = 1000; // tps閾值
-- 請求的令牌數
int required_permits = 1; // 請求數流控每次1個請求,如果是流量流控,可以從外面把這個傳進來
// 存儲流控邏輯中的數據用, 在redis中用hset代替。 [流控實例id串--> [流程過程中需要的參數-->參數值]]
Map<String, Map<String, Object>> storeMap = new ConcurrentHashMap<String, Map<String, Object>>();
-- 下次請求可以獲取令牌的起始時間,初始值為0
long next_free_ticket_micros = storeMap.get(key).get("next_free_ticket_micros") , default: 0 // 取不到就用默認0
-- 當前時間
long now_micros = System.currentTimeMillis();
-- 查詢獲取令牌是否超時
if (ARGV[1] != null) {
-- 獲取令牌的超時時間
long timeout_micros = ARGV[1];
long micros_to_wait = next_free_ticket_micros - now_micros;
if (micros_to_wait > timeout_micros) {
return micros_to_wait
}
}
-- 當前存儲的令牌數
long stored_permits = storeMap.get(key).get("stored_permits") , default: 0 // 取不到就用默認0
-- 添加令牌的時間間隔
float stable_interval_micros = 1000 / permits_per_second;
-- 補充令牌
if (now_micros > next_free_ticket_micros) {
/**
* 當前時間 到 下次請求可以獲取令牌的起始時間 之間差多少毫秒 就補 多少毫秒/產生單個可用令牌的毫秒數
* 比如 1000tps 則產生1個令牌要1毫秒,假設 上面差50毫秒,那麽就可以有50個新令牌可以用
*
**/
long new_permits = (now_micros - next_free_ticket_micros) / stable_interval_micros;
stored_permits = math.min(max_permits, stored_permits + new_permits); // 取最大令牌數 與 存儲令牌數+新可用令牌數 小的一個
next_free_ticket_micros = now_micros; // 將當前時間更新為next_free_ticket_micros ,因為有新令牌能用了嘛
}
-- 消耗令牌
long moment_available = next_free_ticket_micros;
long stored_permits_to_spend = math.min(required_permits, stored_permits); // 將要花掉多少令牌, 請求數控制的是1 取小的是因為不能超過可用令牌數
long fresh_permits = required_permits - stored_permits_to_spend; // 這次用掉的,要減掉
long wait_micros = fresh_permits * stable_interval_micros; // fresh_permits > 0 表示申請的令牌不夠,則需要等,乘以每個令牌需要的產生時間,就是要等多久
// redis.replicate_commands() // 在redis腳本中調用time會有問題的規避
storeMap.get(key).put("stored_permits", stored_permits - stored_permits_to_spend);
storeMap.get(key).put("next_free_ticket_micros", next_free_ticket_micros + wait_micros);
// redis.call('expire', key, 10) // 每隔10s刷新一下這個流控實例
-- 返回需要等待的時間長度
return moment_available - now_micros;
最後貼一下redis的lua腳本:
-- key
local key = KEYS[1]
-- 最大存儲的令牌數
local max_permits = tonumber(KEYS[2])
-- 每秒鐘產生的令牌數
local permits_per_second = tonumber(KEYS[3])
-- 請求的令牌數
local required_permits = tonumber(ARGV[1])
-- 下次請求可以獲取令牌的起始時間
local next_free_ticket_micros = tonumber(redis.call('hget', key, 'next_free_ticket_micros') or 0)
-- 當前時間
local time = redis.call('time')
local now_micros = tonumber(time[1]) * 1000000 + tonumber(time[2])
-- 查詢獲取令牌是否超時
if (ARGV[2] ~= nil) then
-- 獲取令牌的超時時間
local timeout_micros = tonumber(ARGV[2])
local micros_to_wait = next_free_ticket_micros - now_micros
if (micros_to_wait > timeout_micros) then
return micros_to_wait
end
end
-- 當前存儲的令牌數
local stored_permits = tonumber(redis.call('hget', key, 'stored_permits') or 0)
-- 添加令牌的時間間隔
local stable_interval_micros = 1000000 / permits_per_second
-- 補充令牌
if (now_micros > next_free_ticket_micros) then
local new_permits = (now_micros - next_free_ticket_micros) / stable_interval_micros
stored_permits = math.min(max_permits, stored_permits + new_permits)
next_free_ticket_micros = now_micros
end
-- 消耗令牌
local moment_available = next_free_ticket_micros
local stored_permits_to_spend = math.min(required_permits, stored_permits)
local fresh_permits = required_permits - stored_permits_to_spend;
local wait_micros = fresh_permits * stable_interval_micros
redis.replicate_commands()
redis.call('hset', key, 'stored_permits', stored_permits - stored_permits_to_spend)
redis.call('hset', key, 'next_free_ticket_micros', next_free_ticket_micros + wait_micros)
redis.call('expire', key, 10)
-- 返回需要等待的時間長度
return moment_available - now_micros
再寫一篇tps限流