1. 程式人生 > 其它 >redis實現分散式介面限流

redis實現分散式介面限流

技術標籤:日常開發遇到的問題

限流的目的是通過對併發訪問/請求進行限速或者一個時間視窗內的的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務。

限流包括兩種:
1.Nginx接入層限流
按照一定的規則如帳號、IP、系統呼叫邏輯等在Nginx層面做限流
2.業務應用系統限流
通過業務程式碼控制流量這個流量可以被稱為訊號量,可以理解成是一種鎖,它可以限制一項資源最多能同時被多少程序訪問。

這裡主要說說第二種使用redis在業務層進行限流。

公司做電商業務的,經常會發放優惠券,在某個點優惠券領取開放後,會有大量的併發流量進入,導致系統壓力過大,為了限制併發流量,這裡使用redis在業務層做了流量控制。

  /**
         * 如果使用者在限流之外,則提示使用者稍後再試
         */
        int limitNum = 100;
        long limitSecond = 1000;
        if (redisUtil.acquireTokenFromBucket(limitNum, limitSecond) == null) {
            logger.error("使用者在限流之外,稍後再試");
            return CommonResult.failed(-3,"使用者在限流之外,稍後再試"
); }

下面主要介紹一下redisUtil類中的這個方法(這個方法參考了別人的,原文請點選


    private static final String BUCKET = "BUCKET";
    private static final String BUCKET_COUNT = "BUCKET_COUNT";
    private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
    
 /**
     * Redis實現分散式應用限流的方法
     * @param limit     請求訊號量
     * @param timeout   請求時間
     * @return
     */
public String acquireTokenFromBucket(int limit, long timeout) { //這裡獲取jeids連線池,具體方法,可以自己實現 Jedis jedis = getJedis(); if(jedis == null){ return null; } //這裡開始,是限流的方法 try{ String identifier = UUIDGenerator.generate32UUID(); long now = System.currentTimeMillis(); Transaction transaction = jedis.multi(); //刪除訊號量 transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes()); ZParams params = new ZParams(); params.weightsByDouble(1.0,0.0); transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR); //計數器自增 transaction.incr(BUCKET_COUNT); List<Object> results = transaction.exec(); long counter = (Long) results.get(results.size() - 1); transaction = jedis.multi(); transaction.zadd(BUCKET_MONITOR, now, identifier); transaction.zadd(BUCKET, counter, identifier); transaction.zrank(BUCKET, identifier); results = transaction.exec(); //獲取排名,判斷請求是否取得了訊號量 long rank = (Long) results.get(results.size() - 1); if (rank < limit) { return identifier; } else {//沒有獲取到訊號量,清理之前放入redis 中垃圾資料 transaction = jedis.multi(); transaction.zrem(BUCKET_MONITOR, identifier); transaction.zrem(BUCKET, identifier); transaction.exec(); } } catch (Exception e) { log.error("在列表key的尾部插入元素失敗:" + e.getMessage(), e); returnBrokenResource(jedis); } finally { returnResource(jedis); } return null; } /** * 釋放jedis資源 * * @param jedis */ @SuppressWarnings("deprecation") public void returnResource(final Jedis jedis) { if (jedis != null && jedisPool != null) { jedisPool.returnResource(jedis); } } @SuppressWarnings("deprecation") public void returnBrokenResource(final Jedis jedis) { if (jedis != null && jedisPool != null) { jedisPool.returnBrokenResource(jedis); } }

業務層用法,如最開始的程式碼,通過返回判斷是否為空,判斷當前流量是否限流


以下是從網上找到的詳細方法和測試

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;
 
/**
 * 實現類
 */
public class RedisRateLimiter {
  private static final String BUCKET = "BUCKET";
  private static final String BUCKET_COUNT = "BUCKET_COUNT";
  private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
 
  static String acquireTokenFromBucket(
      Jedis jedis, int limit, long timeout) {
    String identifier = UUID.randomUUID().toString();
    long now = System.currentTimeMillis();
    Transaction transaction = jedis.multi();
 
    //刪除訊號量
    transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());
    ZParams params = new ZParams();
    params.weightsByDouble(1.0,0.0);
    transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);
 
    //計數器自增
    transaction.incr(BUCKET_COUNT);
    List<Object> results = transaction.exec();
    long counter = (Long) results.get(results.size() - 1);
 
    transaction = jedis.multi();
    transaction.zadd(BUCKET_MONITOR, now, identifier);
    transaction.zadd(BUCKET, counter, identifier);
    transaction.zrank(BUCKET, identifier);
    results = transaction.exec();
    //獲取排名,判斷請求是否取得了訊號量
    long rank = (Long) results.get(results.size() - 1);
    if (rank < limit) {
      return identifier;
    } else {//沒有獲取到訊號量,清理之前放入redis 中垃圾資料
      transaction = jedis.multi();
      transaction.zrem(BUCKET_MONITOR, identifier);
      transaction.zrem(BUCKET, identifier);
      transaction.exec();
    }
    return null;
  }
}

測試呼叫類

@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
  Jedis jedis = jedisPool.getResource();
  String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);
  if (token == null) {
    response.sendError(500);
  }else{
    //TODO 你的業務邏輯
  }
  jedisPool.returnResource(jedis);
}

此外還可以使用註解類+切面攔截來處理

@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
  private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);
  @Autowired
  private JedisPool jedisPool;
 
  public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(new HandlerInterceptorAdapter() {
      public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                   Object handler) throws Exception {
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();
        RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
        if (rateLimiter != null){
          int limit = rateLimiter.limit();
          int timeout = rateLimiter.timeout();
          Jedis jedis = jedisPool.getResource();
          String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
          if (token == null) {
            response.sendError(500);
            return false;
          }
          logger.debug("token -> {}",token);
          jedis.close();
        }
        return true;
      }
    }).addPathPatterns("/*");
  }
}

定義註解類

/**
 * 限流注解
 */
 
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
  int limit() default 5; //限流數量
  int timeout() default 1000; //限流時間
}

介面使用,直接使用註解

@RateLimiter(limit = 2, timeout = 5000)
@GetMapping("/test")
public void test() {
}

測試結果
併發測試

工具:apache-jmeter-3.2
說明: 沒有獲取到訊號量的介面返回500,status是紅色,獲取到訊號量的介面返回200,status是綠色。
當限制請求訊號量為2,併發5個執行緒:
在這裡插入圖片描述

當限制請求訊號量為5,併發10個執行緒:
在這裡插入圖片描述

總結:
1.對於訊號量的操作,使用事務操作。
2.不要使用時間戳作為訊號量的排序分數,因為在分散式環境中,各個節點的時間差的原因,會出現不公平訊號量的現象。
3.可以使用把這塊程式碼抽成@rateLimiter註解,然後再方法上使用就會很方便
4.不同介面的流控,可以參考原始碼的裡面RedisRateLimiterPlus,無非是每個介面生成一個監控引數