1. 程式人生 > 其它 >Redis實現訪問次數限流,這有難點嗎?

Redis實現訪問次數限流,這有難點嗎?

大家好,我是【架構擺渡人】,一隻十年的程式猿,這是流量治理系列的第11篇原創文章,如果有收穫,還請分享給更多的朋友。

假設我們要做一個業務需求,這個需求就是限制使用者的訪問頻次。比如1分鐘內只能訪問20次,10分鐘內只能訪問200次。因為是使用者維度的場景,效能肯定是要首先考慮,那麼適合這個場景的非Redis莫屬。

最簡單的實現,莫過於只是用incr進行計數操作,於是有了下面的程式碼:

long count = redisTemplate.opsForValue().increment("user:1:60");
if (count > maxLimitCount) {
   throw new LimitException("訪問太頻繁");
}

count = redisTemplate.opsForValue().increment("user:1:600");
if (count > maxLimitCount) {
   throw new LimitException("訪問太頻繁");
}

來,我們對上面這段程式碼解讀一下。需求有2個時間維度的限制,所以這邊基於使用者和時間維度構建了Redis的Key。然後對每個Key進行計數,計數後的結果用於跟限制的值進行判斷,如果超出了限制的值就丟擲異常。

假設限制的時間場景有10個呢?那上面的程式碼是不是得寫10遍才可以。有人可能會說,這還不簡單嗎?迴圈呀,迴圈確實能夠解決這個問題。但是大家有沒有去思考,這是使用者維度的請求,如果每個請求裡面都去操作10次Redis的話,這耗時至少也得10來毫秒吧。所以問題在這,並不是說這個邏輯實現的有問題。

那我們就改成批量的吧,用pipeline來批量執行。

redisTemplate.execute(new RedisCallback<Long>() {
  @Override
  public Long doInRedis(RedisConnection connection) throws DataAccessException {
    connection.openPipeline();
    connection.incr("user:1:60".getBytes());
        connection.incr("user:1:600".getBytes());
        onnection.closePipeline();
    return null;
  }
});

用pipeline也有一個問題,那就是拿不到返回值,也就只能增加,但是沒辦法判斷是否超過了限制的閥值。

所以需要在第一步先查詢下,用查到的值進行判斷,這樣也就是隻需要和Redis互動兩次就可以了。

上面的程式碼在單節點下沒問題,但是如果在叢集下,其實每個Key都可能分配到不同的節點上去,只不過是底層幫你遮蔽掉了細節,併發執行,拿到了所有結果後合併返回的。所以我們需要讓所有的Key都路由到一個節點上,本來就是使用者維度的,直接使用userId路由即可。

這個時候Redis的HashTag功能就排上用場了,將Key user:1:600改寫成user:{1}:600 。

雖然已經優化了,但是還是要發起兩次網路請求才能完成這個邏輯,有沒有可能再進一步優化下呢?一次請求行不行。

這個時候要放大招了,Lua指令碼走起,將所有邏輯都放入Lua指令碼中,一次網路互動即可完成。

local current
current = redis.call("incr",KEYS[1])
if current == 1 then
    redis.call("expire",KEYS[1],1)
end

if current > ARGV[1]
  return 1
end

return 0

上面指令碼演示瞭如何對一個Key進行處理,返回1表示限流,返回0表示通過。不過使用lua指令碼的時候要注意,某些雲服務的Redis會對指令碼進行校驗,像Redis的Key不能使用變數,必須用KEYS[下標]的方式,所以這裡操作多個Key還不能用迴圈,程式碼得寫多遍,這是一個噁心的點。

原創:架構擺渡人(公眾號ID:jiagoubaiduren),歡迎分享,轉載請保留出處。

本文已收錄至學習網站 http://cxytiandi.com/ ,裡面有Spring Boot, Spring Cloud,分庫分表,微服務,面試等相關內容。