使用reids實現限流
1.今天我們就基於Redis元件的特性,實現一個分散式限流元件,
原理
首先解釋下為何採用Redis作為限流元件的核心。
通俗地講,假設一個使用者(用IP判斷)每秒訪問某服務介面的次數不能超過10次,那麼我們可以在Redis中建立一個鍵,並設定鍵的過期時間為60秒。
當一個使用者對此服務介面發起一次訪問就把鍵值加1,在單位時間(此處為1s)內當鍵值增加到10的時候,就禁止訪問服務介面。PS:在某種場景中新增訪問時間間隔還是很有必要的。我們本次不考慮間隔時間,只關注單位時間內的訪問次數。
2. 開發核心
2.1 基於Redis的incr及過期機制開發呼叫方便。
2.2宣告式Spring支援
另外,在本次開發中,我們不通過簡單的呼叫Redis的java類庫API實現對Redis的incr操作。
原因在於,我們要保證整個限流的操作是原子性的,如果用Java程式碼去做操作及判斷,會有併發問題。這裡我決定採用Lua指令碼進行核心邏輯的定義。
為何使用Lua
在正式開發前,我簡單介紹下對Redis的操作中,為何推薦使用Lua指令碼。
減少網路開銷: 不使用 Lua 的程式碼需要向 Redis 傳送多次請求, 而指令碼只需一次即可, 減少網路傳輸;
原子操作: Redis 將整個指令碼作為一個原子執行, 無需擔心併發, 也就無需事務;
複用: 指令碼會永久儲存 Redis 中, 其他客戶端可繼續使用.
Redis添加了對Lua的支援,能夠很好的滿足原子性、事務性的支援,讓我們免去了很多的異常邏輯處理。對於Lua的語法不是本文的主要內容,感興趣的可以自行查詢資料。
3. 正式開發
3.1 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.4.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
3.2新建一個Redis的配置類,命名為RedisCacheConfig,使用javaconfig形式注入RedisTemplate
/** * @author wangbs * @version 1.0 * @date 2019/12/17 1:15 * @className RedisCacheConfig * @desc Redis配置 */ @Configuration public class RedisCacheConfig { private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class); /** * 配置自定義序列化器 * @return */ @Bean public RedisCacheConfiguration redisCacheConfiguration() { return RedisCacheConfiguration .defaultCacheConfig() .serializeKeysWith( RedisSerializationContext .SerializationPair .fromSerializer(new StringRedisSerializer())) .serializeValuesWith( RedisSerializationContext .SerializationPair .fromSerializer(new GenericJackson2JsonRedisSerializer())); } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(預設使用JDK的序列化方式) Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer來序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); LOGGER.info("Springboot RedisTemplate 載入完成"); return template; } }
3.3呼叫方application.propertie需要增加Redis配置
#單機模式redis spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.pool.maxActive=8 spring.redis.pool.maxWait=-1 spring.redis.pool.maxIdle=8 spring.redis.pool.minIdle=0 spring.redis.timeout=10000 spring.redis.password=
3.4 自定義限流使用的註解RateLimiter
該註解明確只用於方法,主要有三個屬性。
key--表示限流模組名,指定該值用於區分不同應用,不同場景,推薦格式為:應用名:模組名:ip:介面名:方法名
limit--表示單位時間允許通過的請求數
expire--incr的值的過期時間,業務中表示限流的單位時間。
/** * @author wangbs * @version 1.0 * @date 2019/12/16 1:25 * @className RateLimiter * @desc 限流注解 */ //註解作用域 // ElementType.TYPE:允許被修飾的註解作用在類、介面和列舉上 // ElementType.FIELD:允許作用在屬性欄位上 // ElementType.METHOD:允許作用在方法上 // ElementType.PARAMETER:允許作用在方法引數上 // ElementType.CONSTRUCTOR:允許作用在構造器上 // ElementType.LOCAL_VARIABLE:允許作用在本地區域性變數上 // ElementType.ANNOTATION_TYPE:允許作用在註解上 // ElementType.PACKAGE:允許作用在包上 // // 註解的生命週期 // RetentionPolicy.SOURCE:當前註解編譯期可見,不會寫入 class 檔案 // RetentionPolicy.CLASS:類載入階段丟棄,會寫入 class 檔案 // RetentionPolicy.RUNTIME:永久儲存,可以反射獲取 // 註解的作用域 @Target(ElementType.METHOD) // 註解的生命週期 @Retention(RetentionPolicy.RUNTIME) // 允許子類繼承 @Inherited // 生成javadoc的時候生成註解的資訊 @Documented public @interface RateLimiter { /** * 限流key * @return */ String key() default "rate:limiter"; /** * 單位時間限制通過請求數 * @return */ long limit() default 10; /** * 過期時間,單位秒 * @return */ long expire() default 1; /** * 限流提示語 * @return */ String message() default "false"; }
3.5 定義開發註解使用的切面,這裡我們直接使用aspectj進行切面的開發
/**
* @author wangbs
* @version 1.0
* @date 2019/12/16 1:17
* @className RateLimterHandler
* @desc 限流處理器
*/
@Aspect
@Component
public class RateLimterHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimterHandler.class);
@Autowired
RedisTemplate redisTemplate;
private DefaultRedisScript<Long> getRedisScript;
/**
* 這裡是注入了RedisTemplate,使用其API進行Lua指令碼的呼叫。
*
* init() 方法在應用啟動時會初始化DefaultRedisScript,並載入Lua指令碼,方便進行呼叫。
*
* PS: Lua指令碼放置在classpath下,通過ClassPathResource進行載入。
*/
@PostConstruct
public void init() {
getRedisScript = new DefaultRedisScript<>();
getRedisScript.setResultType(Long.class);
getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimter.lua")));
LOGGER.info("RateLimterHandler[分散式限流處理器]指令碼載入完成");
}
/**
* 這裡我們定義了一個切點,表示只要註解了 @RateLimiter 的方法,均可以觸發限流操作。
*/
@Pointcut("@annotation(com.vx.servicehi.annotation.RateLimiter)")
public void rateLimiter() {}
/**
* 這段程式碼的邏輯為,獲取 @RateLimiter 註解配置的屬性:key、limit、expire,並通過 redisTemplate.execute(RedisScript script, List keys, Object... args) 方法傳遞給Lua指令碼進行限流相關操作,邏輯很清晰。
*
* 這裡我們定義如果指令碼返回狀態為0則為觸發限流,1表示正常請求。
* @param proceedingJoinPoint
* @param rateLimiter
* @return
* @throws Throwable
*/
@Around("@annotation(rateLimiter)")
public Object around(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLimiter) throws Throwable {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分散式限流處理器]開始執行限流操作");
}
Signature signature = proceedingJoinPoint.getSignature();
if (!(signature instanceof MethodSignature)) {
throw new IllegalArgumentException("the Annotation @RateLimter must used on method!");
}
/**
* 獲取註解引數
*/
// 限流模組key
String limitKey = rateLimiter.key();
if(StringUtils.isBlank(limitKey)){
throw new NullPointerException();
}
// 限流閾值
long limitTimes = rateLimiter.limit();
// 限流超時時間
long expireTime = rateLimiter.expire();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分散式限流處理器]引數值為-limitTimes={},limitTimeout={}", limitTimes, expireTime);
}
// 限流提示語
String message = rateLimiter.message();
if (StringUtils.isBlank(message)) {
message = "false";
}
/**
* 執行Lua指令碼
*/
List<String> keyList = new ArrayList();
// 設定key值為註解中的值
keyList.add(limitKey);
/**
* 呼叫指令碼並執行
*/
Long result = (Long) redisTemplate.execute(getRedisScript, keyList, expireTime, limitTimes);
if (result == 0) {
String msg = "由於超過單位時間=" + expireTime + "-允許的請求次數=" + limitTimes + "[觸發限流]";
LOGGER.debug(msg);
return message;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分散式限流處理器]限流執行結果-result={},請求[正常]響應", result);
}
return proceedingJoinPoint.proceed();
}
}
3.6Lua指令碼這裡是我們整個限流操作的核心,通過執行一個Lua指令碼進行限流的操作。指令碼內容如下
--獲取KEY local key1 = KEYS[1] --給指定的key 值增加一,如果 key 不存在,那麼 key 的值會先被初始化為 0 ,然後再執行 INCR 操作 local val = redis.call('incr', key1) --以秒為單位返回 key 的剩餘過期時間 local ttl = redis.call('ttl', key1) --獲取ARGV內的引數並列印 local expire = ARGV[1] local times = ARGV[2] redis.log(redis.LOG_DEBUG,tostring(times)) redis.log(redis.LOG_DEBUG,tostring(expire)) redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val); if val == 1 then redis.call('expire', key1, tonumber(expire)) else if ttl == -1 then --expire當key不存在或者不能為key設定生存時間時返回 0 redis.call('expire', key1, tonumber(expire)) end end if val > tonumber(times) then return 0 end return 1
邏輯很通俗,我簡單介紹下。
首先指令碼獲取Java程式碼中傳遞而來的要限流的模組的key,不同的模組key值一定不能相同,否則會覆蓋!
redis.call('incr', key1)對傳入的key做incr操作,如果key首次生成,設定超時時間ARGV[1];(初始值為1)
ttl是為防止某些key在未設定超時時間並長時間已經存在的情況下做的保護的判斷;
每次請求都會做+1操作,當限流的值val大於我們註解的閾值,則返回0表示已經超過請求限制,觸發限流。否則為正常請求。
當過期後,又是新的一輪迴圈,整個過程是一個原子性的操作,能夠保證單位時間不會超過我們預設的請求閾值。
3.7 測試
這裡我貼一下核心程式碼,我們定義一個介面,並註解 @RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100) 表示模組ratedemo:sendPayment:1.0.0 在100s內允許通過5個請求,這裡的引數設定是為了方便看結果。實際中,我們通常會設定1s內允許通過的次數。
@Controller
public class TestController {
private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);
@ResponseBody
@RequestMapping("ratelimiter")
@RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100)
public String sendPayment(HttpServletRequest request) throws Exception {
return "正常請求";
}
}