1. 程式人生 > 實用技巧 >也來說說redis+lua實現高併發限流

也來說說redis+lua實現高併發限流

我們的靈活用工系統呼叫優付渠道介面做使用者簽約或資金下發時,優付系統增加了API介面請求的限流策略。

針對每一個商戶的每種型別的介面請求做限流。比如:同一商戶,每秒鐘只允許20次簽約請求。當超過20個時,會提示“客戶請求**介面次數超限”。

那麼,作為下游系統,我們就要對併發進行控制,以防出現無效請求。

最常用的併發限流方案是藉助redis/jedis。為了保證原子性,這裡,我使用Redis+LUA指令碼的方式來控制。

那麼,

對於服務提供方來說,當請求量超出設定的限流閾值,則直接返回錯誤碼/錯誤提示,並終止對請求的處理。

而對於呼叫方來說呢,我們要做的是,當併發請求超出了限定閾值時,要延遲請求,而不是直接丟棄。

話不多說,上程式碼吧。

如下RedisLimiter類,服務提供方使用limit方法實現限流,服務呼叫方使用limitWait方法實現限流等待(如需)。

package jstudy.redislimit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; /** * Redis+Lua實現高併發的限流 * https://mp.weixin.qq.com/s/sP4Hu8HdbVrE-Cm0uPPotg */ @Slf4j @Component public
class RedisLimiter { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 達到限流時,則等待,直到新的間隔。 * * @param key * @param limitCount * @param limitSecond */ public void limitWait(String key, int limitCount, int limitSecond) { boolean ok;//放行標誌 do { ok = limit(key, limitCount, limitSecond); log.info("放行標誌={}", ok); if (!ok) { Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS); if (null != ttl && ttl > 0) { try { Thread.sleep(ttl); log.info("sleeped:{}", ttl); } catch (InterruptedException e) { e.printStackTrace(); } } } } while (!ok); } /** * 限流方法 true-放行;false-限流 * * @param key * @param limitCount * @param limitSecond * @return */ public boolean limit(String key, int limitCount, int limitSecond) { List<String> keys = Collections.singletonList(key); String luaScript = buildLuaScript(); RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class); Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond); log.info("Access try count is {} for key = {}", count, key); if (count != null && count.intValue() <= limitCount) { return true;//放行 } else { return false;//限流 // throw new RuntimeException("You have been dragged into the blacklist"); } } /** * 編寫 redis Lua 限流指令碼 */ public String buildLuaScript() { StringBuilder lua = new StringBuilder(); lua.append("local c"); lua.append("\nc = redis.call('get',KEYS[1])"); // 呼叫不超過最大值,則直接返回 lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then"); lua.append("\nreturn c;"); lua.append("\nend"); // 執行計算器自加 lua.append("\nc = redis.call('incr',KEYS[1])"); lua.append("\nif tonumber(c) == 1 then"); // 從第一次呼叫開始限流,設定對應鍵值的過期 lua.append("\nredis.call('expire',KEYS[1],ARGV[2])"); lua.append("\nend"); lua.append("\nreturn c;"); return lua.toString(); } }

springboot自動注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定義如下:

package jstudy.redislimit;

import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching // 開啟快取支援
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * RedisTemplate配置
     *
     * @param lettuceConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 設定序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
        om.enableDefaultTyping(DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}
View Code

併發測試通過,如下是testcase:

package jstudy.redislimit;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedisLimiterTest {
    @Autowired
    private RedisLimiter redisLimiter;

    @Test
    public void testLimitWait() throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        log.info("--------{}", redisTemplate.opsForValue().get("abc"));
        for (int j = 1; j <= 5; j++) {
            int i=j;
            pool.execute(() -> {
                Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_"));
                redisLimiter.limitWait("abc", 3, 1);
                log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS));
                    try {
                        // 執行緒等待,模擬執行業務邏輯
                        Thread.sleep(new Random().nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            });
        }
        pool.shutdown();
        pool.awaitTermination(2,TimeUnit.SECONDS);
    }
}
View Code