1. 程式人生 > 實用技巧 >使用reids實現限流

使用reids實現限流

1.今天我們就基於Redis元件的特性,實現一個分散式限流元件,

原理
首先解釋下為何採用Redis作為限流元件的核心。

通俗地講,假設一個使用者(用IP判斷)每秒訪問某服務介面的次數不能超過10次,那麼我們可以在Redis中建立一個鍵,並設定鍵的過期時間為60秒。

當一個使用者對此服務介面發起一次訪問就把鍵值加1,在單位時間(此處為1s)內當鍵值增加到10的時候,就禁止訪問服務介面。PS:在某種場景中新增訪問時間間隔還是很有必要的。我們本次不考慮間隔時間,只關注單位時間內的訪問次數。

2. 開發核心

2.1 基於Redis的incr及過期機制開發呼叫方便。

2.2宣告式Spring支援

另外,在本次開發中,我們不通過簡單的呼叫Redis的java類庫API實現對Redis的incr操作。

原因在於,我們要保證整個限流的操作是原子性的,如果用Java程式碼去做操作及判斷,會有併發問題。這裡我決定採用Lua指令碼進行核心邏輯的定義。

為何使用Lua
在正式開發前,我簡單介紹下對Redis的操作中,為何推薦使用Lua指令碼。

減少網路開銷: 不使用 Lua 的程式碼需要向 Redis 傳送多次請求, 而指令碼只需一次即可, 減少網路傳輸;
原子操作: Redis 將整個指令碼作為一個原子執行, 無需擔心併發, 也就無需事務;
複用: 指令碼會永久儲存 Redis 中, 其他客戶端可繼續使用.
Redis添加了對Lua的支援,能夠很好的滿足原子性、事務性的支援,讓我們免去了很多的異常邏輯處理。對於Lua的語法不是本文的主要內容,感興趣的可以自行查詢資料。

3. 正式開發

3.1 引入依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
  <version>1.4.2.RELEASE</version>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-redis</artifactId>

  <version>1.4.2.RELEASE</version>
</dependency>

 3.2新建一個Redis的配置類,命名為RedisCacheConfig,使用javaconfig形式注入RedisTemplate

/**
 * @author wangbs
 * @version 1.0
 * @date 2019/12/17 1:15
 * @className RedisCacheConfig
 * @desc Redis配置
 */
@Configuration
public class RedisCacheConfig {


    private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class);

    /**
     * 配置自定義序列化器
     * @return
     */
    @Bean
    public RedisCacheConfiguration redisCacheConfiguration() {
        return RedisCacheConfiguration
                .defaultCacheConfig()
                .serializeKeysWith(
                        RedisSerializationContext
                                .SerializationPair
                                .fromSerializer(new StringRedisSerializer()))
                .serializeValuesWith(
                        RedisSerializationContext
                                .SerializationPair
                                .fromSerializer(new GenericJackson2JsonRedisSerializer()));
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(預設使用JDK的序列化方式)
        Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(mapper);

        template.setValueSerializer(serializer);
        //使用StringRedisSerializer來序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        LOGGER.info("Springboot RedisTemplate 載入完成");
        return template;
    }
}

  3.3呼叫方application.propertie需要增加Redis配置

 #單機模式redis
    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    spring.redis.pool.maxActive=8
    spring.redis.pool.maxWait=-1
    spring.redis.pool.maxIdle=8
    spring.redis.pool.minIdle=0
    spring.redis.timeout=10000
    spring.redis.password=

  3.4 自定義限流使用的註解RateLimiter

  該註解明確只用於方法,主要有三個屬性。

  key--表示限流模組名,指定該值用於區分不同應用,不同場景,推薦格式為:應用名:模組名:ip:介面名:方法名
  limit--表示單位時間允許通過的請求數
  expire--incr的值的過期時間,業務中表示限流的單位時間。

/**
 * @author wangbs
 * @version 1.0
 * @date 2019/12/16 1:25
 * @className RateLimiter
 * @desc 限流注解
 */

//註解作用域
//        ElementType.TYPE:允許被修飾的註解作用在類、介面和列舉上
//        ElementType.FIELD:允許作用在屬性欄位上
//        ElementType.METHOD:允許作用在方法上
//        ElementType.PARAMETER:允許作用在方法引數上
//        ElementType.CONSTRUCTOR:允許作用在構造器上
//        ElementType.LOCAL_VARIABLE:允許作用在本地區域性變數上
//        ElementType.ANNOTATION_TYPE:允許作用在註解上
//        ElementType.PACKAGE:允許作用在包上
//
//        註解的生命週期
//        RetentionPolicy.SOURCE:當前註解編譯期可見,不會寫入 class 檔案
//	RetentionPolicy.CLASS:類載入階段丟棄,會寫入 class 檔案
//	RetentionPolicy.RUNTIME:永久儲存,可以反射獲取

// 註解的作用域
@Target(ElementType.METHOD)
// 註解的生命週期
@Retention(RetentionPolicy.RUNTIME)
// 允許子類繼承
@Inherited
// 生成javadoc的時候生成註解的資訊
@Documented
public @interface RateLimiter {

    /**
     * 限流key
     * @return
     */
    String key() default "rate:limiter";
    /**
     * 單位時間限制通過請求數
     * @return
     */
    long limit() default 10;

    /**
     * 過期時間,單位秒
     * @return
     */
    long expire() default 1;

    /**
     * 限流提示語
     * @return
     */
    String message() default "false";
}

3.5 定義開發註解使用的切面,這裡我們直接使用aspectj進行切面的開發

/**
* @author wangbs
* @version 1.0
* @date 2019/12/16 1:17
* @className RateLimterHandler
* @desc 限流處理器
*/
@Aspect
@Component
public class RateLimterHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(RateLimterHandler.class);

@Autowired
RedisTemplate redisTemplate;

private DefaultRedisScript<Long> getRedisScript;

/**
* 這裡是注入了RedisTemplate,使用其API進行Lua指令碼的呼叫。
*
* init() 方法在應用啟動時會初始化DefaultRedisScript,並載入Lua指令碼,方便進行呼叫。
*
* PS: Lua指令碼放置在classpath下,通過ClassPathResource進行載入。
*/
@PostConstruct
public void init() {
getRedisScript = new DefaultRedisScript<>();
getRedisScript.setResultType(Long.class);
getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimter.lua")));
LOGGER.info("RateLimterHandler[分散式限流處理器]指令碼載入完成");
}

/**
* 這裡我們定義了一個切點,表示只要註解了 @RateLimiter 的方法,均可以觸發限流操作。
*/
@Pointcut("@annotation(com.vx.servicehi.annotation.RateLimiter)")
public void rateLimiter() {}

/**
* 這段程式碼的邏輯為,獲取 @RateLimiter 註解配置的屬性:key、limit、expire,並通過 redisTemplate.execute(RedisScript script, List keys, Object... args) 方法傳遞給Lua指令碼進行限流相關操作,邏輯很清晰。
*
* 這裡我們定義如果指令碼返回狀態為0則為觸發限流,1表示正常請求。
* @param proceedingJoinPoint
* @param rateLimiter
* @return
* @throws Throwable
*/
@Around("@annotation(rateLimiter)")
public Object around(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLimiter) throws Throwable {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分散式限流處理器]開始執行限流操作");
}
Signature signature = proceedingJoinPoint.getSignature();
if (!(signature instanceof MethodSignature)) {
throw new IllegalArgumentException("the Annotation @RateLimter must used on method!");
}
/**
* 獲取註解引數
*/
// 限流模組key
String limitKey = rateLimiter.key();
if(StringUtils.isBlank(limitKey)){
throw new NullPointerException();
}
// 限流閾值
long limitTimes = rateLimiter.limit();
// 限流超時時間
long expireTime = rateLimiter.expire();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分散式限流處理器]引數值為-limitTimes={},limitTimeout={}", limitTimes, expireTime);
}
// 限流提示語
String message = rateLimiter.message();
if (StringUtils.isBlank(message)) {
message = "false";
}
/**
* 執行Lua指令碼
*/
List<String> keyList = new ArrayList();
// 設定key值為註解中的值
keyList.add(limitKey);
/**
* 呼叫指令碼並執行
*/
Long result = (Long) redisTemplate.execute(getRedisScript, keyList, expireTime, limitTimes);
if (result == 0) {
String msg = "由於超過單位時間=" + expireTime + "-允許的請求次數=" + limitTimes + "[觸發限流]";
LOGGER.debug(msg);
return message;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RateLimterHandler[分散式限流處理器]限流執行結果-result={},請求[正常]響應", result);
}
return proceedingJoinPoint.proceed();
}
}

  3.6Lua指令碼這裡是我們整個限流操作的核心,通過執行一個Lua指令碼進行限流的操作。指令碼內容如下

--獲取KEY
local key1 = KEYS[1]
--給指定的key 值增加一,如果 key 不存在,那麼 key 的值會先被初始化為 0 ,然後再執行 INCR 操作
local val = redis.call('incr', key1)
--以秒為單位返回 key 的剩餘過期時間
local ttl = redis.call('ttl', key1)

--獲取ARGV內的引數並列印
local expire = ARGV[1]
local times = ARGV[2]

redis.log(redis.LOG_DEBUG,tostring(times))
redis.log(redis.LOG_DEBUG,tostring(expire))

redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val);
if val == 1 then
    redis.call('expire', key1, tonumber(expire))
else
    if ttl == -1 then
	--expire當key不存在或者不能為key設定生存時間時返回  0
        redis.call('expire', key1, tonumber(expire))
    end
end

if val > tonumber(times) then
    return 0
end

return 1

  

邏輯很通俗,我簡單介紹下。

首先指令碼獲取Java程式碼中傳遞而來的要限流的模組的key,不同的模組key值一定不能相同,否則會覆蓋!
redis.call('incr', key1)對傳入的key做incr操作,如果key首次生成,設定超時時間ARGV[1];(初始值為1)
ttl是為防止某些key在未設定超時時間並長時間已經存在的情況下做的保護的判斷;
每次請求都會做+1操作,當限流的值val大於我們註解的閾值,則返回0表示已經超過請求限制,觸發限流。否則為正常請求。
當過期後,又是新的一輪迴圈,整個過程是一個原子性的操作,能夠保證單位時間不會超過我們預設的請求閾值。

3.7 測試


這裡我貼一下核心程式碼,我們定義一個介面,並註解 @RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100) 表示模組ratedemo:sendPayment:1.0.0 在100s內允許通過5個請求,這裡的引數設定是為了方便看結果。實際中,我們通常會設定1s內允許通過的次數。

@Controller
public class TestController {

private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);

@ResponseBody
@RequestMapping("ratelimiter")
@RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100)
public String sendPayment(HttpServletRequest request) throws Exception {

return "正常請求";
}

}

原始碼地址https://github.com/wangbensen/common-parent