redis實現分散式介面限流
阿新 • • 發佈:2021-01-07
技術標籤:日常開發遇到的問題
限流的目的是通過對併發訪問/請求進行限速或者一個時間視窗內的的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務。
限流包括兩種:
1.Nginx接入層限流
按照一定的規則如帳號、IP、系統呼叫邏輯等在Nginx層面做限流
2.業務應用系統限流
通過業務程式碼控制流量這個流量可以被稱為訊號量,可以理解成是一種鎖,它可以限制一項資源最多能同時被多少程序訪問。
這裡主要說說第二種使用redis在業務層進行限流。
公司做電商業務的,經常會發放優惠券,在某個點優惠券領取開放後,會有大量的併發流量進入,導致系統壓力過大,為了限制併發流量,這裡使用redis在業務層做了流量控制。
/**
* 如果使用者在限流之外,則提示使用者稍後再試
*/
int limitNum = 100;
long limitSecond = 1000;
if (redisUtil.acquireTokenFromBucket(limitNum, limitSecond) == null) {
logger.error("使用者在限流之外,稍後再試");
return CommonResult.failed(-3,"使用者在限流之外,稍後再試" );
}
下面主要介紹一下redisUtil類中的這個方法(這個方法參考了別人的,原文請點選)
private static final String BUCKET = "BUCKET";
private static final String BUCKET_COUNT = "BUCKET_COUNT";
private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
/**
* Redis實現分散式應用限流的方法
* @param limit 請求訊號量
* @param timeout 請求時間
* @return
*/
public String acquireTokenFromBucket(int limit, long timeout) {
//這裡獲取jeids連線池,具體方法,可以自己實現
Jedis jedis = getJedis();
if(jedis == null){
return null;
}
//這裡開始,是限流的方法
try{
String identifier = UUIDGenerator.generate32UUID();
long now = System.currentTimeMillis();
Transaction transaction = jedis.multi();
//刪除訊號量
transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(),
String.valueOf(now - timeout).getBytes());
ZParams params = new ZParams();
params.weightsByDouble(1.0,0.0);
transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);
//計數器自增
transaction.incr(BUCKET_COUNT);
List<Object> results = transaction.exec();
long counter = (Long) results.get(results.size() - 1);
transaction = jedis.multi();
transaction.zadd(BUCKET_MONITOR, now, identifier);
transaction.zadd(BUCKET, counter, identifier);
transaction.zrank(BUCKET, identifier);
results = transaction.exec();
//獲取排名,判斷請求是否取得了訊號量
long rank = (Long) results.get(results.size() - 1);
if (rank < limit) {
return identifier;
} else {//沒有獲取到訊號量,清理之前放入redis 中垃圾資料
transaction = jedis.multi();
transaction.zrem(BUCKET_MONITOR, identifier);
transaction.zrem(BUCKET, identifier);
transaction.exec();
}
} catch (Exception e) {
log.error("在列表key的尾部插入元素失敗:" + e.getMessage(), e);
returnBrokenResource(jedis);
} finally {
returnResource(jedis);
}
return null;
}
/**
* 釋放jedis資源
*
* @param jedis
*/
@SuppressWarnings("deprecation")
public void returnResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}
@SuppressWarnings("deprecation")
public void returnBrokenResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnBrokenResource(jedis);
}
}
業務層用法,如最開始的程式碼,通過返回判斷是否為空,判斷當前流量是否限流
以下是從網上找到的詳細方法和測試
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;
/**
* 實現類
*/
public class RedisRateLimiter {
private static final String BUCKET = "BUCKET";
private static final String BUCKET_COUNT = "BUCKET_COUNT";
private static final String BUCKET_MONITOR = "BUCKET_MONITOR";
static String acquireTokenFromBucket(
Jedis jedis, int limit, long timeout) {
String identifier = UUID.randomUUID().toString();
long now = System.currentTimeMillis();
Transaction transaction = jedis.multi();
//刪除訊號量
transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());
ZParams params = new ZParams();
params.weightsByDouble(1.0,0.0);
transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);
//計數器自增
transaction.incr(BUCKET_COUNT);
List<Object> results = transaction.exec();
long counter = (Long) results.get(results.size() - 1);
transaction = jedis.multi();
transaction.zadd(BUCKET_MONITOR, now, identifier);
transaction.zadd(BUCKET, counter, identifier);
transaction.zrank(BUCKET, identifier);
results = transaction.exec();
//獲取排名,判斷請求是否取得了訊號量
long rank = (Long) results.get(results.size() - 1);
if (rank < limit) {
return identifier;
} else {//沒有獲取到訊號量,清理之前放入redis 中垃圾資料
transaction = jedis.multi();
transaction.zrem(BUCKET_MONITOR, identifier);
transaction.zrem(BUCKET, identifier);
transaction.exec();
}
return null;
}
}
測試呼叫類
@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
Jedis jedis = jedisPool.getResource();
String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);
if (token == null) {
response.sendError(500);
}else{
//TODO 你的業務邏輯
}
jedisPool.returnResource(jedis);
}
此外還可以使用註解類+切面攔截來處理
@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);
@Autowired
private JedisPool jedisPool;
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptorAdapter() {
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
if (rateLimiter != null){
int limit = rateLimiter.limit();
int timeout = rateLimiter.timeout();
Jedis jedis = jedisPool.getResource();
String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
if (token == null) {
response.sendError(500);
return false;
}
logger.debug("token -> {}",token);
jedis.close();
}
return true;
}
}).addPathPatterns("/*");
}
}
定義註解類
/**
* 限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
int limit() default 5; //限流數量
int timeout() default 1000; //限流時間
}
介面使用,直接使用註解
@RateLimiter(limit = 2, timeout = 5000)
@GetMapping("/test")
public void test() {
}
測試結果
併發測試
工具:apache-jmeter-3.2
說明: 沒有獲取到訊號量的介面返回500,status是紅色,獲取到訊號量的介面返回200,status是綠色。
當限制請求訊號量為2,併發5個執行緒:
當限制請求訊號量為5,併發10個執行緒:
總結:
1.對於訊號量的操作,使用事務操作。
2.不要使用時間戳作為訊號量的排序分數,因為在分散式環境中,各個節點的時間差的原因,會出現不公平訊號量的現象。
3.可以使用把這塊程式碼抽成@rateLimiter註解,然後再方法上使用就會很方便
4.不同介面的流控,可以參考原始碼的裡面RedisRateLimiterPlus,無非是每個介面生成一個監控引數