Spring Cloud Gateway 自定義限流
在使用Spring Cloud Gateway限流功能時官網提供的限流中的流速以及桶容量是針對所有策略的,意思是隻要配置上那麼所有的都是一樣的,不能根據不同的型別配置不同的引數,例如:A渠道、B渠道,若配置上replenishRate(流速)和burstCapacity(令牌桶容量),那麼不管是A渠道還是B渠道都是這個值,如果修改那麼對應的其他渠道也會修改,如何能做到分為不同渠道進行限流呢,A渠道replenishRate:10,burstCapacity:100,B渠道:replenishRate:20,burstCapacity:1000,下面開始分析:
限流方式採用的redis,底層使用的redis的lua指令碼實現的,具體可以自行查閱,不做講解,預設限流類:RedisRateLimiter,本文也是仿照這個進行重寫的。
本文是參考“
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
自定義限流類
參照RedisRateLimiter進行自定義限流類SystemRedisRateLimiter用於渠道限流方式,實現程式碼如下:
/**
* @author : Erick
* @version : 1.0
* @Description :
* @time :2018-12-1
*/
public class SystemRedisRateLimiter extends AbstractRateLimiter<SystemRedisRateLimiter.Config> implements ApplicationContextAware {
//這些變數全部從RedisRateLimiter複製的,都會用到。
public static final String REPLENISH_RATE_KEY = "replenishRate";
public static final String BURST_CAPACITY_KEY = "burstCapacity";
public static final String CONFIGURATION_PROPERTY_NAME = "sys-redis-rate-limiter";
public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";
//處理速度
private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
//容量
private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";
private ReactiveRedisTemplate<String, String> redisTemplate;
private RedisScript<List<Long>> script;
private AtomicBoolean initialized = new AtomicBoolean(false);
private String remainingHeader = REMAINING_HEADER;
/** The name of the header that returns the replenish rate configuration. */
private String replenishRateHeader = REPLENISH_RATE_HEADER;
/** The name of the header that returns the burst capacity configuration. */
private String burstCapacityHeader = BURST_CAPACITY_HEADER;
private Config defaultConfig;
public SystemRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
RedisScript<List<Long>> script, Validator validator) {
super(Config.class , CONFIGURATION_PROPERTY_NAME , validator);
this.redisTemplate = redisTemplate;
this.script = script;
initialized.compareAndSet(false,true);
}
public SystemRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity){
super(Config.class , CONFIGURATION_PROPERTY_NAME , null);
defaultConfig = new Config()
.setReplenishRate(defaultReplenishRate)
.setBurstCapacity(defaultBurstCapacity);
}
//具體限流實現,此處呼叫的是lua指令碼
@Override
public Mono<RateLimiter.Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
if (ObjectUtils.isEmpty(rateLimiterConf) ){
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
//獲取的是自定義的map
Map<String , Integer> rateLimitMap = rateLimiterConf.getRateLimitMap();
//快取的key
String replenishRateKey = routeId + "." + id + "." + REPLENISH_RATE_KEY;
//若map中不存在則採用預設值,存在則取值。
int replenishRate = ObjectUtils.isEmpty(rateLimitMap.get(replenishRateKey)) ? rateLimitMap.get(DEFAULT_REPLENISHRATE) : rateLimitMap.get(replenishRateKey);
//容量key
String burstCapacityKey = routeId + "." + id + "." + BURST_CAPACITY_KEY;
//若map中不存在則採用預設值,存在則取值。
int burstCapacity = ObjectUtils.isEmpty(rateLimitMap.get(burstCapacityKey)) ? rateLimitMap.get(DEFAULT_BURSTCAPACITY) : rateLimitMap.get(burstCapacityKey);
try {
List<String> keys = getKeys(id);
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}) .map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
RateLimiter.Response response = new RateLimiter.Response(allowed, getHeaders(replenishRate , burstCapacity , tokensLeft));
return response;
});
} catch (Exception e) {
e.printStackTrace();
}
return Mono.just(new RateLimiter.Response(true, getHeaders(replenishRate , burstCapacity , -1L)));
}
private RateLimiterConf rateLimiterConf;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.rateLimiterConf = applicationContext.getBean(RateLimiterConf.class);
}
public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity , Long tokensLeft) {
HashMap<String, String> headers = new HashMap<>();
headers.put(this.remainingHeader, tokensLeft.toString());
headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
return headers;
}
static List<String> getKeys(String id) {
// use `{}` around keys to use Redis Key hash tags
// this allows for using redis cluster
// Make a unique key per user.
//此處可以自定義redis字首資訊
String prefix = "request_sys_rate_limiter.{" + id;
// You need two Redis keys for Token Bucket.
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
@Validated
public static class Config{
@Min(1)
private int replenishRate;
@Min(1)
private int burstCapacity = 1;
public int getReplenishRate() {
return replenishRate;
}
public Config setReplenishRate(int replenishRate) {
this.replenishRate = replenishRate;
return this;
}
public int getBurstCapacity() {
return burstCapacity;
}
public Config setBurstCapacity(int burstCapacity) {
this.burstCapacity = burstCapacity;
return this;
}
@Override
public String toString() {
return "Config{" +
"replenishRate=" + replenishRate +
", burstCapacity=" + burstCapacity +
'}';
}
}
}
在繼承AbstractRateLimiter泛型使用的是自定義類中的config:SystemRedisRateLimiter.Config
配置類
配置類主要用於初始化map引數。
/**
* @author : Erick
* @version : 1.0
* @Description :
* @time :2018-12-1
*/
@Component
//使用配置檔案的方式進行初始化
@ConfigurationProperties(prefix = "ratelimiter-conf")
public class RateLimiterConf {
//處理速度
private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
//容量
private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";
private Map<String , Integer> rateLimitMap = new ConcurrentHashMap<String , Integer>(){
{
put(DEFAULT_REPLENISHRATE , 10);
put(DEFAULT_BURSTCAPACITY , 100);
}
};
public Map<String, Integer> getRateLimitMap() {
return rateLimitMap;
}
public void setRateLimitMap(Map<String, Integer> rateLimitMap) {
this.rateLimitMap = rateLimitMap;
}
}
配置檔案主要採用配置檔案的方式進行初始化,若配置則進行新增,若沒有配置則採用預設值。
配置檔案
主要用於配置各個渠道的限流閥值,例如文章開頭舉例的A渠道和B渠道,配置如下:
//與配置類RateLimiterConf保持一致
ratelimiter-conf:
#配置限流引數與RateLimiterConf類對映
rateLimitMap:
#格式為:routeid(gateway配置routes時指定的).系統名稱.replenishRate(流速)/burstCapacity令牌桶大小
service.A.replenishRate: 10
service.A.burstCapacity: 100
service.B.replenishRate: 20
service.B.burstCapacity: 1000
到此配置限流相關的程式碼已經完成,需要在啟動類和bootstrap.yml中進行配置才能夠真正的使用。
啟動類中聲名
在啟動類中宣告使用的策略,指定自定義限流類。
@Bean
KeyResolver sysKeyResolver(){
//從請求地址中擷取sys值,進行限流。
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("sys"));
}
@Bean
@Primary
//使用自己定義的限流類
SystemRedisRateLimiter systemRedisRateLimiter(
ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(SystemRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script,
Validator validator){
return new SystemRedisRateLimiter(redisTemplate , script , validator);
}
採用的是從請求地址中獲取sys引數對應的值,當然也可以設定為其他值,再聲名自定義的限流類。
配置限流
spring:
cloud:
gateway:
routes:
- id: service //id名稱需要與配置限流的保持一致
uri: lb://serviceA
predicates:
- Path=/service/**/ //最後的/可以去掉,在本文中特意新增的如果去掉會把filters當成註釋內容。
filters:
- name: RequestRateLimiter
args:
//需要與上邊的方法名保持一致
rate-limiter: "#{@systemRedisRateLimiter}"
//需要與策略類的方法名保持一致。
key-resolver: "#{@sysKeyResolver}"
至此自定義限流程式碼全部完成,如有什麼不妥支援請指正,同時也希望對其他人有幫助。例項程式碼已上傳到碼雲可以點選檢視。