spring-cloud-gateway(二)es代理功能需求
專案用spring-boot-starter-webflux 實現了es6和es7的相容層閘道器,但只是做可行性驗證
真正生產使用的閘道器還需要更多功能,最基本的,限流,熔斷,負載均衡,這些都獨立往專案里加,每個點都需要花精力整合
實際這方面應用已經有了很成熟的方案,服務治理相關,k8s,ingress,istio,kong,nginx...
但對es這類bare服務,套用雲服務的方案並不合適,kong/nginx因為是c+lua的技術棧,定製的成本較高
nodejs已經過氣了,go和java,考慮生態,選擇java
java方面的閘道器,很少單獨提及,更多是做為java服務治理的一個元件來使用
java類閘道器,早些年大家都使用netty/mina原生實現
但後期有基於netty的各種封裝好的http框架,再完全用netty開發http類閘道器就比較少見了,當然tcp/udp類,自定義rpc的還是免不了直接和netty打交道
整合netty的 http高效能閘道器類服務早些年個人用過 jersey,後來用vert.x,再後來就直接上spring-boot-starter-webflux了,抽空把歷史程式碼扒出來
現在為了省去在webflux自行新增限流,熔斷,降級等功能,直接使用spring-boot-gateway
實際spring-boot-gateway,本身整合mvn/webflux的兩種方案,webflux的底層就是netty
本地代理其實可以理解為替代nginx,並實現一些和業務產品深度結合的功能
因為nginx c+lua的技術棧和開發成本較高
首先spring-cloud-gateway 原生是 spring-cloud的元件,應用場景和spring-cloud深耦合
例如,loadbanlance,依賴spring-cloud的服務發現元件,consule,nacos等
https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#spring-cloud-loadbalancer
Spring Cloud Commons provides the @EnableDiscoveryClient annotation. This looks for implementations of the DiscoveryClient and ReactiveDiscoveryClient interfaces with META-INF/spring.factories. Implementations of the discovery client add a configuration class to spring.factories under the org.springframework.cloud.client.discovery.EnableDiscoveryClient key. Examples of DiscoveryClient implementations include Spring Cloud Netflix Eureka, Spring Cloud Consul Discovery, and Spring Cloud Zookeeper Discovery.
spring-cloud-gateway 基礎支援
spring:
cloud:
gateway:
routes:
- id: before_route
uri: https://example.org
predicates:
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]
以該項為例,關鍵是uri 這個引數
閘道器單點1:1 uri 可以寫一個http/https的訪問地址
如果需要實現負載均衡gateway: server 1:n 則需要實現 uri("lb://backing-service:8088")
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("circuitbreaker_route", r -> r.path("/consumingServiceEndpoint")
.filters(f -> f.circuitBreaker(c -> c.name("myCircuitBreaker").fallbackUri("forward:/inCaseOfFailureUseThis").addStatusCode("INTERNAL_SERVER_ERROR"))
.rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")).uri("lb://backing-service:8088")
.build();
}
es3
這裡實際依賴了spring-cloud生態的服務發現元件,註冊服務 backing-service 至註冊中心,spring-cloud從註冊中心獲取真實的服務地址host:port,再通過客戶端負載均衡lb 路由至真實服務
這是服務治理的基本原理流程
但是對目前的場景不適用,目前的場景,基本可以理解為把spring-cloud-gateway 當nginx用
路由到後端靜態的幾個地址即可
以es為例 3個client節點
es-client-01 192.168.10.11:9200
es-client-02 192.168.10.12:9200
es-client-03 192.168.10.13:9200
並不存在一個有效的註冊中心,實際多一個註冊中心元件,專案的複雜度就更高了,對es 這類服務元件,並不需要和spring的生態完全結合
我們需要定製lb://
的解析,即,使lb://
不通過註冊中心,而是完全靜態配置在本地(先搞靜態吧,以後再考慮搞動態,此動態非彼動態)
需要注意withHealthChecks()
顧名思義 添加了withHealthChecks() 則會對 後端server進行健康檢查,檢查方式為驗證對應的後端server /actuator/health
是否可達。這個路徑是spring-boot/spring-cloud 元件的預設路徑,但後端的es服務,這個路徑並不可達。.withHealthChecks() 會返回Service Unavailable
{"timestamp":"2021-06-02T09:04:00.596+00:00","path":"/","status":503,"error":"Service Unavailable","requestId":"d3d01e2e-1"}
@Bean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withBase(new CustomServiceInstanceListSupplier())
// .withHealthChecks()
.build(context);
}
健康檢查部分程式碼,關鍵注意/actuator/health
package org.springframework.cloud.loadbalancer.core;
public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier implements InitializingBean, DisposableBean {
private static final Log LOG = LogFactory.getLog(HealthCheckServiceInstanceListSupplier.class);
private final HealthCheck healthCheck;
private final String defaultHealthCheckPath;
private final Flux<List<ServiceInstance>> aliveInstancesReplay;
private Disposable healthCheckDisposable;
private final BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction;
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, HealthCheck healthCheck, BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
super(delegate);
this.defaultHealthCheckPath = (String)healthCheck.getPath().getOrDefault("default", "/actuator/health");
this.aliveFunction = aliveFunction;
this.healthCheck = healthCheck;
Repeat<Object> aliveInstancesReplayRepeat = Repeat.onlyIf((repeatContext) -> {
return this.healthCheck.getRefetchInstances();
}).fixedBackoff(healthCheck.getRefetchInstancesInterval());
Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate).repeatWhen(aliveInstancesReplayRepeat).switchMap((serviceInstances) -> {
return this.healthCheckFlux(serviceInstances).map((alive) -> {
return Collections.unmodifiableList(new ArrayList(alive));
});
});
this.aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1).refCount(1);
}
}
我們先不採用withHealthChecks,等後期有時間再自定義或配置withHealthChecks實現
package org.springframework.cloud.loadbalancer.core;
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";
private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
private Duration timeout = Duration.ofSeconds(30L);
private final String serviceId;
private final Flux<List<ServiceInstance>> serviceInstances;
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty("loadbalancer.client.name");
this.resolveTimeout(environment);
this.serviceInstances = Flux.defer(() -> {
return Flux.just(delegate.getInstances(this.serviceId));
}).subscribeOn(Schedulers.boundedElastic()).timeout(this.timeout, Flux.defer(() -> {
this.logTimeout();
return Flux.just(new ArrayList());
})).onErrorResume((error) -> {
this.logException(error);
return Flux.just(new ArrayList());
});
}
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty("loadbalancer.client.name");
this.resolveTimeout(environment);
this.serviceInstances = Flux.defer(() -> {
return delegate.getInstances(this.serviceId).collectList().flux().timeout(this.timeout, Flux.defer(() -> {
this.logTimeout();
return Flux.just(new ArrayList());
})).onErrorResume((error) -> {
this.logException(error);
return Flux.just(new ArrayList());
});
});
}
public String getServiceId() {
return this.serviceId;
}
public Flux<List<ServiceInstance>> get() {
return this.serviceInstances;
}
private void resolveTimeout(Environment environment) {
String providedTimeout = environment.getProperty("spring.cloud.loadbalancer.service-discovery.timeout");
if (providedTimeout != null) {
this.timeout = DurationStyle.detectAndParse(providedTimeout);
}
}
private void logTimeout() {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Timeout occurred while retrieving instances for service %s.The instances could not be retrieved during %s", this.serviceId, this.timeout));
}
}
private void logException(Throwable error) {
LOG.error(String.format("Exception occurred while retrieving instances for service %s", this.serviceId), error);
}
}