[Redis]Redis做簡單的分散式限流
阿新 • • 發佈:2020-11-01
分散式限流
上一篇部落格中提到的單個應用的限流存在侷限性,那就是隻能對單個例項進行限流,而面對現在大型的應用,這種限流肯定是不夠的,那麼就要進行對整個分散式系統。對於分散式限流,我從個人的理解上有兩個途徑,第一種就是在應用之外,也就是比如用Nginx來做限流,第二種就是系統自己利用第三方的中介軟體來自己實現限流。這裡瞭解一下第二種途徑吧,利用Redis來做限流。
演算法
之前有提過幾個限流演算法,分別是計數器,漏桶,滑動視窗,令牌桶。這裡採用令牌桶演算法來實現。具體怎麼實現呢。
初始化令牌桶
令牌桶演算法需要初始化桶的最大能夠盛放的令牌個數,以及勻速放入令牌桶的令牌的放入速率。這裡初始化令牌桶的最大盛放令牌個數為100個,放入速率為5,如下圖,limitkey1是統一的key。
時間的問題
之前的分散式鎖的文章就有提過,多個系統請求的時候,他們之間的時間可能是不一致的,當然那篇文章中的時間和此處的時間並無關聯。主要是這裡需要進行取令牌和放令牌的操作,需要獲取最後一次放入令牌的時間。所以這裡統一採用redis中的time命令來獲取統一的時間。lua指令碼如下:
local times = redis.pcall("TIME")
return tonumber(times[1])*1000+tonumber(times[2])/1000
獲取令牌
獲取令牌可以結合令牌桶演算法畫一個簡單流程圖。
對應的lua指令碼如下
local local_key = KEYS[1] local permits = ARGV[1] local curr_mill_second = ARGV[2] if tonumber(redis.pcall("EXISTS", local_key)) < 1 then return 0 end --- 令牌桶內資料: --- last_mill_second 最後一次放入令牌時間 --- curr_permits 當前桶內令牌 --- max_permits 桶內令牌最大數量 --- rate 令牌放置速度 local rate_limit_info = redis.pcall("HMGET", local_key, "last_mill_second", "curr_permits", "max_permits", "rate") local last_mill_second = rate_limit_info[1] local curr_permits = tonumber(rate_limit_info[2]) local max_permits = tonumber(rate_limit_info[3]) local rate = rate_limit_info[4] --- 標識沒有配置令牌桶 if type(max_permits) == 'boolean' or max_permits == nil then return 0 end --- 若令牌桶引數沒有配置,則返回0 if type(rate) == 'boolean' or rate == nil then return 0 end local local_curr_permits = max_permits; --- 令牌桶剛剛建立,上一次獲取令牌的毫秒數為空 --- 根據和上一次向桶裡新增令牌的時間和當前時間差,觸發式往桶裡新增令牌,並且更新上一次向桶裡新增令牌的時間 --- 如果向桶裡新增的令牌數不足一個,則不更新上一次向桶裡新增令牌的時間 --- ~=號在Lua指令碼的含義就是不等於!= if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then if(curr_mill_second - last_mill_second < 0) then return -1 end --- 生成令牌操作 local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate) --- 最關鍵程式碼:根據時間差計算令牌數量並勻速的放入令牌 local expect_curr_permits = reverse_permits + curr_permits; local_curr_permits = math.min(expect_curr_permits, max_permits); --- 如果期望令牌數大於桶容量,則設為桶容量 --- 大於0表示這段時間產生令牌,則更新最新令牌放入時間 if (reverse_permits > 0) then redis.pcall("HSET", local_key, "last_mill_second", curr_mill_second) end else redis.pcall("HSET", local_key, "last_mill_second", curr_mill_second) end --- 取出令牌操作 local result = -1 if (local_curr_permits - permits >= 0) then result = 1 redis.pcall("HSET", local_key, "curr_permits", local_curr_permits - permits) else redis.pcall("HSET", local_key, "curr_permits", local_curr_permits) end return result
呼叫
- 建立註解,在介面對傳入引數進行初始化
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface FlowLimit {
/**
* 請求令牌數
* */
int permit() default 1;
/**
* 請求的key
* */
String key();
}
- 建立一個切面,進行取令牌操作
切面程式設計我也是第一次用,有些寫的不太好的地方還請見諒。有了前面的指令碼,事實上程式碼就不怎麼複雜了,主要就是載入兩次lua指令碼,然後完成相應的邏輯就好了。
@Aspect
@Configuration
public class RedisLimitInterceptor {
private static final Logger logger = LoggerFactory.getLogger(RedisLimitInterceptor.class);
private final RedisTemplate<String,Serializable> redisTemplate;
@Autowired
public RedisLimitInterceptor(RedisTemplate<String,Serializable> template){
this.redisTemplate = template;
}
@Around("execution(public * *(..))&&@annotation(cn.izzer.flow_limit_redis.anonation.FlowLimit)")
public Object interceptor(ProceedingJoinPoint point){
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
FlowLimit annotation = method.getAnnotation(FlowLimit.class);
int permit = annotation.permit();
ImmutableList<String> keys = ImmutableList.of(annotation.key());
logger.info(String.format("欲取出的令牌數:%d key:%s ",permit,keys.get(0)));
logger.info("開始嘗試取出令牌...");
try {
//先獲取系統時間
logger.info("嘗試獲取當前時間...");
DefaultRedisScript<Number> redisScript = new DefaultRedisScript<Number>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/redis_currentTimeMillis")));
redisScript.setResultType(Number.class);
Number currentTimeMillies = redisTemplate.execute(redisScript,keys);
logger.info(String.format("當前時間:%d",currentTimeMillies));
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/redis_acquire")));
//請求令牌
Number reqResult = redisTemplate.execute(redisScript,keys,annotation.permit(),currentTimeMillies.intValue());
logger.info(String.format("請求結果:%d",reqResult.intValue()));
if(reqResult.intValue()==1){
logger.info("請求成功");
return point.proceed();
}
else{
logger.info("限流了");
throw new RuntimeException("超限限流");
}
}catch (Throwable e){
if(e instanceof RuntimeException){
e.printStackTrace();
throw new RuntimeException(e.getLocalizedMessage());
}
throw new RuntimeException("伺服器錯誤");
}
}
}
- Controller層進行呼叫
@RestController
public class RedisLimitController {
private static final Logger logger = LoggerFactory.getLogger(RedisLimitController.class);
@FlowLimit(permit = 2,key = "limitkey1")
@GetMapping("/limit")
public String limit(){
return "ok";
}
}
- 測試環節
老樣子,用JMeter進行併發呼叫測試。用十個執行緒迴圈十次去請求介面,每次取出兩個令牌。
總結
這次的限流實際上是一個比較初級的版本,在生產中還是要結合業務進行設計,指令碼有肯定要更加複雜,因為要面對更復雜的場景。