Redis-漏斗限流
漏斗限流
基本思路
漏斗限流是最常用的限流方法之一, 顧名思義, 這個演算法的靈感來源於漏斗的結構。
漏斗的容量是優先的, 如果將漏斗嘴堵住, 然後一直往裡面灌水, 它就會變滿, 直至再也裝不進去。如果將漏斗嘴放開, 水就會往下流, 水流走一部分後, 就又可以繼續往裡面灌水。 如果漏斗的流水速率大於灌水的速率, 那麼漏斗永遠都裝不滿。如果漏斗流水速率小於灌水速率, 那麼一旦漏斗滿了, 灌水就需要暫停並等待漏斗騰空。
所以漏斗的剩餘空間就代表這當前行為可以持續進行的數量, 漏斗的流水速率代表著系統允許該行為的最大頻率。 下面使用程式碼來描述單機漏斗演算法:
class Funnel(object): def __init__(self, capacity, leaking_rate): # 漏斗容量 self.capacity = capacity # 漏斗流水速率 self.leaking_rage = leaking_rate # 漏斗剩餘空間 self.left_quota = capacity # self.leaking_ts = time.time() def make_space(self): not_ts = time.time() # 距離上一次漏斗漏水過去了多久 delta_ts = now_ts - self.leaking_ts # 又可以騰出不少空間了 delta_quota = delta_ts * self.leaking_rate # 騰出的空間太少了, 那就等下次吧 if delta_quota < 1: return # 增加剩餘空間 self.left_quota += delta_quota # 記錄漏水時間 self.leaking_ts = now_ts # 剩餘空間不能高於容量 if self.left_quota > self.capacity: self.left_quota = self.capacity def watering(self, quota): self.make_space() # 判斷剩餘空間是否足夠 if self.left_quota >= quota: self.left_quota -= quota return True return False # 所有的漏斗 funnels = {} def is_action_allowed(user_id, action_key, capacity, leaking_rate): key = '%s:%s' %(user_id, action_key) funnel = funnels.get(key) if not funnel: funnel = Funnel(capacity, leaking_rate) funnels[key] = funnel return funnel.watering(1) for i in range(20): print(is_allow_aoolwed('david', 'reply', 15, 0.5))
Funnel物件的make_space方法是漏斗演算法的核心, 其在每次灌水前都會被呼叫以觸發漏水, 給漏斗騰出空間來。能騰出多少空間取決於過去了多久以及流水的速率。 Funnel物件佔據的空間大小不在和行為的頻率成正比, 它的空間佔用是一個常量。
問題來了, 分散式的漏斗演算法該如何實現? 能不能使用Redis的基礎資料結構來搞定?
我們觀察Funnel物件的幾個欄位, 我們發現可以將Funnel物件的內容按欄位儲存到一個hash結構中, 灌水的時候將hash結構的欄位取出來進行邏輯運算後, 再將新值回填到hash結構中就完成了一次行為頻度的檢測。
但是有個問題, 我們無法保證整個過程是原子性的。 從hash結構中取值, 然後在記憶體中運算, 在回填到hash結構, 這三個過程無法原子化, 意味著需要進行適當的加鎖控制。而一旦加鎖, 就意味著會有加鎖失敗,加鎖失敗就需要重新選擇重試或者放棄。
如果重試的話, 就會導致效能下降。 如果放棄的話,就會影響使用者體驗。 同時, 程式碼的複雜度也跟著升高很多。 這是個艱難的選擇, 該如何解決這個問題呢? Redis-Cell救星來了
Redis-Cell
Redis4.0提供了一個限流Redis模組, 它叫redis-cell。 該模組也使用了漏斗演算法, 並提供了原子的限流指令。 有了這個模組, 限流問題就非常簡單了。
該模組只有1條指令cl.throttle,它的引數和返回值都略顯複雜, 接下來看看這個指令具體該如何使用。
127.0.0.1:6379>cl.throttle david:reply 15 30 60 # 0表示允許, 1表示拒絕 1) (integer) 0 # 漏斗容量capacity 2) (integer) 15 # 漏斗剩餘空間left_quota 3) (integer) 14 # 如果拒絕了, 需要多長時間後再試(漏斗有空間了, 單位秒) 4) (integer) -1 # 多長時間後, 漏斗完全空出來(left_quota==capacity, 單位秒) 5) (integer) 2
在執行指令時, 如果被拒絕了, 就需要丟棄或重試。cl.throttle指令考慮的非常周到, 連重試時間都幫你算好了, 直接取返回結果陣列的第四個值進行sleep即可, 不過不想阻塞執行緒, 也可以非同步定時任務來重試。