1. 程式人生 > 實用技巧 >[Redis]Redis做簡單的分散式限流

[Redis]Redis做簡單的分散式限流

分散式限流

上一篇部落格中提到的單個應用的限流存在侷限性,那就是隻能對單個例項進行限流,而面對現在大型的應用,這種限流肯定是不夠的,那麼就要進行對整個分散式系統。對於分散式限流,我從個人的理解上有兩個途徑,第一種就是在應用之外,也就是比如用Nginx來做限流,第二種就是系統自己利用第三方的中介軟體來自己實現限流。這裡瞭解一下第二種途徑吧,利用Redis來做限流。

演算法

之前有提過幾個限流演算法,分別是計數器,漏桶,滑動視窗,令牌桶。這裡採用令牌桶演算法來實現。具體怎麼實現呢。

初始化令牌桶

令牌桶演算法需要初始化桶的最大能夠盛放的令牌個數,以及勻速放入令牌桶的令牌的放入速率。這裡初始化令牌桶的最大盛放令牌個數為100個,放入速率為5,如下圖,limitkey1是統一的key。

時間的問題

之前的分散式鎖的文章就有提過,多個系統請求的時候,他們之間的時間可能是不一致的,當然那篇文章中的時間和此處的時間並無關聯。主要是這裡需要進行取令牌和放令牌的操作,需要獲取最後一次放入令牌的時間。所以這裡統一採用redis中的time命令來獲取統一的時間。lua指令碼如下:

local times = redis.pcall("TIME")
return tonumber(times[1])*1000+tonumber(times[2])/1000

獲取令牌

獲取令牌可以結合令牌桶演算法畫一個簡單流程圖。

對應的lua指令碼如下

local local_key =  KEYS[1]
local permits = ARGV[1]
local curr_mill_second = ARGV[2]
if tonumber(redis.pcall("EXISTS", local_key)) < 1 then
   return 0
end

--- 令牌桶內資料:
---             last_mill_second  最後一次放入令牌時間
---             curr_permits  當前桶內令牌
---             max_permits   桶內令牌最大數量
---             rate  令牌放置速度
local rate_limit_info = redis.pcall("HMGET", local_key, "last_mill_second", "curr_permits", "max_permits", "rate")
local last_mill_second = rate_limit_info[1]
local curr_permits = tonumber(rate_limit_info[2])
local max_permits = tonumber(rate_limit_info[3])
local rate = rate_limit_info[4]
--- 標識沒有配置令牌桶
if type(max_permits) == 'boolean' or max_permits == nil then
   return 0
end
--- 若令牌桶引數沒有配置,則返回0
if type(rate) == 'boolean' or rate == nil then
   return 0
end

local local_curr_permits = max_permits;

--- 令牌桶剛剛建立,上一次獲取令牌的毫秒數為空
--- 根據和上一次向桶裡新增令牌的時間和當前時間差,觸發式往桶裡新增令牌,並且更新上一次向桶裡新增令牌的時間
--- 如果向桶裡新增的令牌數不足一個,則不更新上一次向桶裡新增令牌的時間
--- ~=號在Lua指令碼的含義就是不等於!=
if (type(last_mill_second) ~= 'boolean'  and last_mill_second ~= nil) then
    if(curr_mill_second - last_mill_second < 0) then
       return -1
    end

    --- 生成令牌操作
    local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate) --- 最關鍵程式碼:根據時間差計算令牌數量並勻速的放入令牌
    local expect_curr_permits = reverse_permits + curr_permits;
    local_curr_permits = math.min(expect_curr_permits, max_permits);  --- 如果期望令牌數大於桶容量,則設為桶容量
    --- 大於0表示這段時間產生令牌,則更新最新令牌放入時間
    if (reverse_permits > 0) then
        redis.pcall("HSET", local_key, "last_mill_second", curr_mill_second)
    end
else
    redis.pcall("HSET", local_key, "last_mill_second", curr_mill_second)
end
--- 取出令牌操作
    local result = -1
    if (local_curr_permits - permits >= 0) then
        result = 1
        redis.pcall("HSET", local_key, "curr_permits", local_curr_permits - permits)
    else
        redis.pcall("HSET", local_key, "curr_permits", local_curr_permits)
    end
return result

呼叫

  • 建立註解,在介面對傳入引數進行初始化
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface FlowLimit {
    /**
     * 請求令牌數
     * */
    int permit() default 1;

    /**
     * 請求的key
     * */
    String key();
}

  • 建立一個切面,進行取令牌操作

切面程式設計我也是第一次用,有些寫的不太好的地方還請見諒。有了前面的指令碼,事實上程式碼就不怎麼複雜了,主要就是載入兩次lua指令碼,然後完成相應的邏輯就好了。

@Aspect
@Configuration
public class RedisLimitInterceptor {

    private static final Logger logger = LoggerFactory.getLogger(RedisLimitInterceptor.class);


    private final RedisTemplate<String,Serializable> redisTemplate;


    @Autowired
    public RedisLimitInterceptor(RedisTemplate<String,Serializable> template){
        this.redisTemplate = template;
    }

    @Around("execution(public * *(..))&&@annotation(cn.izzer.flow_limit_redis.anonation.FlowLimit)")
    public Object interceptor(ProceedingJoinPoint point){

        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        FlowLimit annotation = method.getAnnotation(FlowLimit.class);
        int permit = annotation.permit();

        ImmutableList<String> keys = ImmutableList.of(annotation.key());
        logger.info(String.format("欲取出的令牌數:%d key:%s ",permit,keys.get(0)));

        logger.info("開始嘗試取出令牌...");

        try {
            //先獲取系統時間
            logger.info("嘗試獲取當前時間...");
            DefaultRedisScript<Number> redisScript = new DefaultRedisScript<Number>();
            redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/redis_currentTimeMillis")));
            redisScript.setResultType(Number.class);
            Number currentTimeMillies = redisTemplate.execute(redisScript,keys);
            logger.info(String.format("當前時間:%d",currentTimeMillies));

            redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/redis_acquire")));
            //請求令牌
            Number reqResult = redisTemplate.execute(redisScript,keys,annotation.permit(),currentTimeMillies.intValue());

            logger.info(String.format("請求結果:%d",reqResult.intValue()));

            if(reqResult.intValue()==1){
                logger.info("請求成功");
                return point.proceed();
            }
            else{
                logger.info("限流了");
                throw new RuntimeException("超限限流");
            }

        }catch (Throwable e){
            if(e instanceof RuntimeException){
                e.printStackTrace();
                throw new RuntimeException(e.getLocalizedMessage());
            }
            throw new RuntimeException("伺服器錯誤");
        }
    }

}

  • Controller層進行呼叫
@RestController
public class RedisLimitController {

    private static final Logger logger = LoggerFactory.getLogger(RedisLimitController.class);

    @FlowLimit(permit = 2,key = "limitkey1")
    @GetMapping("/limit")
    public String limit(){
        return "ok";
    }

}
  • 測試環節

老樣子,用JMeter進行併發呼叫測試。用十個執行緒迴圈十次去請求介面,每次取出兩個令牌。

總結

這次的限流實際上是一個比較初級的版本,在生產中還是要結合業務進行設計,指令碼有肯定要更加複雜,因為要面對更復雜的場景。