1. 程式人生 > 其它 >spring-cloud-gateway(二)es代理功能需求

spring-cloud-gateway(二)es代理功能需求

實際是該專案的延申 cclient/elasticsearch-multi-cluster-compat-proxy: 閘道器代理相容ES6 es7 proxy and compat elasticsearch version 7 and elasticsearch version 6's _search and _bulk request api (github.com)

專案用spring-boot-starter-webflux 實現了es6和es7的相容層閘道器,但只是做可行性驗證

真正生產使用的閘道器還需要更多功能,最基本的,限流,熔斷,負載均衡,這些都獨立往專案里加,每個點都需要花精力整合

實際這方面應用已經有了很成熟的方案,服務治理相關,k8s,ingress,istio,kong,nginx...

但對es這類bare服務,套用雲服務的方案並不合適,kong/nginx因為是c+lua的技術棧,定製的成本較高

nodejs已經過氣了,go和java,考慮生態,選擇java

java方面的閘道器,很少單獨提及,更多是做為java服務治理的一個元件來使用

java類閘道器,早些年大家都使用netty/mina原生實現

但後期有基於netty的各種封裝好的http框架,再完全用netty開發http類閘道器就比較少見了,當然tcp/udp類,自定義rpc的還是免不了直接和netty打交道

整合netty的 http高效能閘道器類服務早些年個人用過 jersey,後來用vert.x,再後來就直接上spring-boot-starter-webflux了,抽空把歷史程式碼扒出來

現在為了省去在webflux自行新增限流,熔斷,降級等功能,直接使用spring-boot-gateway

實際spring-boot-gateway,本身整合mvn/webflux的兩種方案,webflux的底層就是netty

本地代理其實可以理解為替代nginx,並實現一些和業務產品深度結合的功能
因為nginx c+lua的技術棧和開發成本較高

首先spring-cloud-gateway 原生是 spring-cloud的元件,應用場景和spring-cloud深耦合

例如,loadbanlance,依賴spring-cloud的服務發現元件,consule,nacos等

https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#spring-cloud-loadbalancer

Spring Cloud Commons provides the @EnableDiscoveryClient annotation. This looks for implementations of the DiscoveryClient and ReactiveDiscoveryClient interfaces with META-INF/spring.factories. Implementations of the discovery client add a configuration class to spring.factories under the org.springframework.cloud.client.discovery.EnableDiscoveryClient key. Examples of DiscoveryClient implementations include Spring Cloud Netflix Eureka, Spring Cloud Consul Discovery, and Spring Cloud Zookeeper Discovery.

spring-cloud-gateway 基礎支援

Spring Cloud Gateway

spring:
  cloud:
    gateway:
      routes:
      - id: before_route
        uri: https://example.org
        predicates:
        - Before=2017-01-20T17:42:47.789-07:00[America/Denver]

以該項為例,關鍵是uri 這個引數

閘道器單點1:1 uri 可以寫一個http/https的訪問地址

如果需要實現負載均衡gateway: server 1:n 則需要實現 uri("lb://backing-service:8088")

@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("circuitbreaker_route", r -> r.path("/consumingServiceEndpoint")
            .filters(f -> f.circuitBreaker(c -> c.name("myCircuitBreaker").fallbackUri("forward:/inCaseOfFailureUseThis").addStatusCode("INTERNAL_SERVER_ERROR"))
                .rewritePath("/consumingServiceEndpoint", "/backingServiceEndpoint")).uri("lb://backing-service:8088")
        .build();
}
es3

這裡實際依賴了spring-cloud生態的服務發現元件,註冊服務 backing-service 至註冊中心,spring-cloud從註冊中心獲取真實的服務地址host:port,再通過客戶端負載均衡lb 路由至真實服務

這是服務治理的基本原理流程

但是對目前的場景不適用,目前的場景,基本可以理解為把spring-cloud-gateway 當nginx用

路由到後端靜態的幾個地址即可

以es為例 3個client節點

es-client-01 192.168.10.11:9200

es-client-02 192.168.10.12:9200

es-client-03 192.168.10.13:9200

並不存在一個有效的註冊中心,實際多一個註冊中心元件,專案的複雜度就更高了,對es 這類服務元件,並不需要和spring的生態完全結合

我們需要定製lb://的解析,即,使lb:// 不通過註冊中心,而是完全靜態配置在本地(先搞靜態吧,以後再考慮搞動態,此動態非彼動態)

需要注意withHealthChecks() 顧名思義 添加了withHealthChecks() 則會對 後端server進行健康檢查,檢查方式為驗證對應的後端server /actuator/health 是否可達。這個路徑是spring-boot/spring-cloud 元件的預設路徑,但後端的es服務,這個路徑並不可達。.withHealthChecks() 會返回Service Unavailable

{"timestamp":"2021-06-02T09:04:00.596+00:00","path":"/","status":503,"error":"Service Unavailable","requestId":"d3d01e2e-1"}
	@Bean
	public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
			ConfigurableApplicationContext context) {
		return ServiceInstanceListSupplier.builder()
				.withBase(new CustomServiceInstanceListSupplier())
//				.withHealthChecks()
				.build(context);
	}

健康檢查部分程式碼,關鍵注意/actuator/health

package org.springframework.cloud.loadbalancer.core;

public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier implements InitializingBean, DisposableBean {
    private static final Log LOG = LogFactory.getLog(HealthCheckServiceInstanceListSupplier.class);
    private final HealthCheck healthCheck;
    private final String defaultHealthCheckPath;
    private final Flux<List<ServiceInstance>> aliveInstancesReplay;
    private Disposable healthCheckDisposable;
    private final BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction;

    public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, HealthCheck healthCheck, BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
        super(delegate);
        this.defaultHealthCheckPath = (String)healthCheck.getPath().getOrDefault("default", "/actuator/health");
        this.aliveFunction = aliveFunction;
        this.healthCheck = healthCheck;
        Repeat<Object> aliveInstancesReplayRepeat = Repeat.onlyIf((repeatContext) -> {
            return this.healthCheck.getRefetchInstances();
        }).fixedBackoff(healthCheck.getRefetchInstancesInterval());
        Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate).repeatWhen(aliveInstancesReplayRepeat).switchMap((serviceInstances) -> {
            return this.healthCheckFlux(serviceInstances).map((alive) -> {
                return Collections.unmodifiableList(new ArrayList(alive));
            });
        });
        this.aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1).refCount(1);
    }
}    

我們先不採用withHealthChecks,等後期有時間再自定義或配置withHealthChecks實現

package org.springframework.cloud.loadbalancer.core;
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
    public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";
    private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
    private Duration timeout = Duration.ofSeconds(30L);
    private final String serviceId;
    private final Flux<List<ServiceInstance>> serviceInstances;

    public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
        this.serviceId = environment.getProperty("loadbalancer.client.name");
        this.resolveTimeout(environment);
        this.serviceInstances = Flux.defer(() -> {
            return Flux.just(delegate.getInstances(this.serviceId));
        }).subscribeOn(Schedulers.boundedElastic()).timeout(this.timeout, Flux.defer(() -> {
            this.logTimeout();
            return Flux.just(new ArrayList());
        })).onErrorResume((error) -> {
            this.logException(error);
            return Flux.just(new ArrayList());
        });
    }

    public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
        this.serviceId = environment.getProperty("loadbalancer.client.name");
        this.resolveTimeout(environment);
        this.serviceInstances = Flux.defer(() -> {
            return delegate.getInstances(this.serviceId).collectList().flux().timeout(this.timeout, Flux.defer(() -> {
                this.logTimeout();
                return Flux.just(new ArrayList());
            })).onErrorResume((error) -> {
                this.logException(error);
                return Flux.just(new ArrayList());
            });
        });
    }

    public String getServiceId() {
        return this.serviceId;
    }

    public Flux<List<ServiceInstance>> get() {
        return this.serviceInstances;
    }

    private void resolveTimeout(Environment environment) {
        String providedTimeout = environment.getProperty("spring.cloud.loadbalancer.service-discovery.timeout");
        if (providedTimeout != null) {
            this.timeout = DurationStyle.detectAndParse(providedTimeout);
        }

    }

    private void logTimeout() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Timeout occurred while retrieving instances for service %s.The instances could not be retrieved during %s", this.serviceId, this.timeout));
        }

    }

    private void logException(Throwable error) {
        LOG.error(String.format("Exception occurred while retrieving instances for service %s", this.serviceId), error);
    }
}