Spring Cloud Gateway 結合配置中心限流
假設你領導給你安排了一個任務,具體需求如下:
- 針對具體的接口做限流
- 不同接口限流的力度可以不同
- 可以動態調整限流配置,實時生效
如果你接到上面的任務,你會怎麽去設計+實現呢?
每個人看待問題的角度不同,自然思考出來的方案也不同,正所謂條條大路通羅馬,能到達目的地的路那就是一條好路。
如何分析需求
下面我給出我的實現方式,僅供各位參考,大牛請忽略。
具體問題具體分析,針對需求點,分別去做分析。
需求一 “如何針對具體的接口做限流” 這個在上篇文章中也有講過,只需要讓KeyResolver返回的是接口的URI即可,這樣限流的維度那就是對這個接口進行限流。
需求二 “不同接口限流的力度可以不同” 這個通過配置的方式明顯實現不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成動態的那麽必須的自己通過擴展RedisRateLimiter來實現。
前提是必須有一個配置列表,這個配置列表就是每個接口對應的限流數值。有了這個配置我們就可以通過請求的接口獲取這個接口對應的限流值。
需求三“可以動態調整限流配置,實時生效” 這個的話也比較容易,無論你是存文件,存數據庫,存緩存只要每次都去讀取,必然是實時生效的,但是性能問題我們不得不考慮啊。
存文件,讀取文件,耗IO,主要是不方便修改
存數據庫,可以通過web界面去修改,也可以直接改數據庫,每次都要查詢,性能不行
存分布式緩存(redis),性能比數據庫有提高
對比下來肯定是緩存是最優的方案,還有更好的方案嗎?
有,結合配置中心來做,我這邊用自己的配置中心(https://github.com/yinjihuan/smconf)來講解,換成其他的配置中心也是一樣的思路。
配置中心的優點在於它本來就是用來存儲配置的,配置在項目啟動時加載完畢,當有修改時推送更新,每次讀取都在本地對象中,性能好。
具體方案有了之後我們就可以開始擼代碼了,但是你有想過這麽多接口的限流值怎麽初始化嗎?手動一個個去加?
不同的服務維護的小組不同,當然也有可能是一個小組維護,從設計者的角度來思考,應該把設置的權利交給用戶,交給我們的接口開發者,每個接口能夠承受多少並發讓用戶來定,你的職責就是在網關進行限流。當然在公司中具體的限制量也不一定會由開發人員來定哈,這個得根據壓測結果,做最好的調整。
話不多說-開始擼碼
首先我們定義自己的RedisRateLimiter,復制源碼稍微改造下即可, 這邊只貼核心代碼。
public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
implements ApplicationContextAware {
public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";
public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
Validator validator) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
this.redisTemplate = redisTemplate;
this.script = script;
initialized.compareAndSet(false, true);
}
public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);
}
// 限流配置
private RateLimitConf rateLimitConf;
@Override
@SuppressWarnings("unchecked")
public void setApplicationContext(ApplicationContext context) throws BeansException {
** // 加載配置**
this.rateLimitConf = context.getBean(RateLimitConf.class);
}
/**
* This uses a basic token bucket algorithm and relies on the fact that
* Redis scripts execute atomically. No other operations can run between
* fetching the count and writing the new count.
*/
@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
//Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
if (rateLimitConf == null) {
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
Map<String,Integer> routeConfig = rateLimitConf.getLimitMap();
// Key的格式:服務名稱.接口URI.類型
String replenishRateKey = routeId + "." + id + ".replenishRate";
int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);
String burstCapacityKey = routeId + "." + id + ".burstCapacity";
int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);
try {
List<String> keys = getKeys(id);
// The arguments to the LUA script. time() returns unixtime in
// seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
// .log("redisratelimiter", Level.FINER);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
} catch (Exception e) {
/*
* We don‘t want a hard dependency on Redis to allow traffic. Make
* sure to set an alert so you know if this is happening too much.
* Stripe‘s observed failure rate is 0.01%.
*/
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
}
public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
HashMap<String, String> headers = new HashMap<>();
headers.put(this.remainingHeader, tokensLeft.toString());
headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
return headers;
}
}
需要在setApplicationContext中加載我們的配置類,配置類的定義如下:
@CxytianDiConf(system="fangjia-gateway")
public class RateLimitConf {
// 限流配置
@ConfField(value = "limitMap")
private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
put("default.replenishRate", 100);
put("default.burstCapacity", 1000);
}};
public void setLimitMap(Map<String, Integer> limitMap) {
this.limitMap = limitMap;
}
public Map<String, Integer> getLimitMap() {
return limitMap;
}
}
所有的接口對應的限流信息都在map中,有默認值,如果沒有對應的配置就用默認的值對接口進行限流。
isAllowed方法中通過‘服務名稱.接口URI.類型’組成一個Key, 通過這個Key去Map中獲取對應的值。
類型的作用主要是用來區分replenishRate和burstCapacity兩個值。
接下來就是配置CustomRedisRateLimiter:
@Bean@Primary
br/>@Primary
ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
Validator validator) {
return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}
網關這邊的邏輯已經實現好了,接下來就是需要在具體的服務中自定義註解,然後將限流的參數初始化到我們的配置中心就可以了。
定義註解
@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)
br/>@Retention(RetentionPolicy.RUNTIME)
public @interface ApiRateLimit {
/**
* 速率
* @return
*/
int replenishRate() default 100;
/**
* 容積
* @return
*/
int burstCapacity() default 1000;
}
啟動監聽器,讀取註解,初始化配置
/**
- 初始化API網關需要進行並發限制的API
- @author yinjihuan
-
*/
public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {// Controller包路徑
private String controllerPath;private RateLimitConf rateLimitConf;
private ConfInit confInit;
private String applicationName;
public InitGatewayApiLimitRateListener(String controllerPath) {
this.controllerPath = controllerPath;
}@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
this.confInit = event.getApplicationContext().getBean(ConfInit.class);
this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
try {
initLimitRateAPI();
} catch (Exception e) {
throw new RuntimeException("初始化需要進行並發限制的API異常", e);
}
}/**
- 初始化需要進行並發限制的API
- @throws IOException
-
@throws ClassNotFoundException
*/
private void initLimitRateAPI() throws IOException, ClassNotFoundException {
Map<String, Integer> limitMap = rateLimitConf.getLimitMap();
ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
List<String> classList = scan.getFullyQualifiedClassNameList();
for (String clazz : classList) {
Class<?> clz = Class.forName(clazz);
if (!clz.isAnnotationPresent(RestController.class)) {
continue;
}
Method[] methods = clz.getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(ApiRateLimit.class)) {
ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
}
}
}
rateLimitConf.setLimitMap(limitMap);
// 初始化值到配置中心
confInit.init(rateLimitConf);
}private String getApiUri(Class<?> clz, Method method) {
StringBuilder uri = new StringBuilder();
uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
if (method.isAnnotationPresent(GetMapping.class)) {
uri.append(method.getAnnotation(GetMapping.class).value()[0]);
} else if (method.isAnnotationPresent(PostMapping.class)) {
uri.append(method.getAnnotation(PostMapping.class).value()[0]);
} else if (method.isAnnotationPresent(RequestMapping.class)) {
uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
}
return uri.toString();
}
}
配置監聽器
SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);
最後使用就很簡單了,只需要增加註解就可以了
@ApiRateLimit(replenishRate=10, burstCapacity=100)@GetMapping("/data")
br/>@GetMapping("/data")
return new HouseInfo(1L, "上海", "虹口", "東體小區");
}
總結
我這邊只是給大家提供一種去實現的思路,也許大家還有更好的方案。
我覺得只要不讓每個開發都去關心這種非業務性質的功能,那就可以了,都在框架層面處理掉。當然實現原理可以跟大家分享下,會用很好,既會用又了解原理那就更好了。
Spring Cloud Gateway 結合配置中心限流