1. 程式人生 > 其它 >關於sequence問題的緊急處理(r2第26天)

關於sequence問題的緊急處理(r2第26天)

1.gateway簡介

API閘道器功能

  • 協議轉換,路由轉發

  • 流量聚合,對流量進行監控,日誌輸出

  • 作為整個系統的前端工程,對流量進行控制,有限流的作用

  • 作為系統的前端邊界,外部流量只能通過閘道器才能訪問系統

  • 可以在閘道器層做許可權的判斷--安全認證

  • 可以在閘道器層做快取

  • 負載均衡

Gateway的處理流程

客戶端向 Spring Cloud Gateway 發出請求。然後在 Gateway Handler Mapping 中找到與請求相匹配的路由,將其傳送到 Gateway Web Handler。Handler 再通過指定的過濾器鏈來將請求傳送到我們實際的服務執行業務邏輯,然後返回。過濾器之間用虛線分開是因為過濾器可能會在傳送代理請求之前(“pre”)或之後(“post”)執行業務邏輯。

重要概念

  • Route(路由):這是閘道器的基本構建塊。它由一個 ID,一個目標 URl,一組斷言和一組過濾器定義。如果斷言為真,則路由匹配,目標URI會被訪問。

    內建了 10 種 Router,使得我們可以直接配置一下就可以隨心所欲的根據 Header、或者 Path、或者 Host、或者 Query 來做路由。

  • Predicate(斷言):輸入型別是一個 ServerWebExchange。我們可以使用它來匹配來自 HTTP 請求的任何內容,例如 headers 或引數。

  • 過濾器( filter),Gateway中的Fiter 分為兩種型別的Filter,分別是Gateway Filter和Global Filter。過濾器Filter 將會對請求和響應進行修改處理。可以使用它攔截和修改請求,並且對上游的響應進行二次處理。過濾器為org.springframework.cloud.gateway.filter.GatewayFilter類的例項。

    內建了 20 種 Filter 和 9 種全域性 Filter,也都可以直接用。當然自定義 Filter 也非常方便。

依賴

引入了依賴預設即開啟gateway了,如果暫時不想使用這個功能,這可以配置spring.cloud.gateway.enabled=false即可。

注意:gateway搭建可能出現不相容的情況,各種少方法,經調整 springboot 2.2.5 和 springcloud Hoxton.SR4是相容的 和 gateway也是相容的

<spring-boot.version>2.2.5.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR4</spring-cloud.version>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

2.gateway路由配置

spring:
  cloud:
    gateway:
      routes:
      		#自定義的路由 ID,保持唯一
        - id: order-server
        	#目標服務地址 普通url
          #uri: http://localhost:8083/
          #和註冊中心結合方式 lb
          uri: lb://order-server
          #路由條件
          predicates:
            - Path=/order/**
          #過濾器 
          filters:
          	#StripPrefix可以接受一個非負整數,對應實現是StripPrefixGatewayFilterFactory,作用去掉字首,例如本例中通過gateway訪問user/order/version,閘道器服務向後轉發實際路徑為/order/version
            - StripPrefix=1 

3.斷言 predicates

Spring Cloud Gataway 內建了很多 Predicates 功能,這些Predict的原始碼在org.springframework.cloud.gateway.handler.predicate包中

Spring Cloud Gateway 是通過 Spring WebFlux 的 HandlerMapping 做為底層支援來匹配到轉發路由,Spring Cloud Gateway 內建了很多 Predicates 工廠,這些 Predicates 工廠通過不同的 HTTP 請求引數來匹配,多個 Predicates 工廠可以組合使用。

說白了 Predicate 就是為了實現一組匹配規則,方便讓請求過來找到對應的 Route 進行處理.

每一個Predicate的使用可以理解為:當滿足這種條件後才會被轉發,如果是多個,那就是都滿足的情況下被轉發。

常用斷言:

一個請求滿足多個路由的斷言條件時,請求只會被首個成功匹配的路由轉發

spring:
 cloud:
    gateway:
      routes:
        - id: order-server
          uri: http://localhost:8083/
          predicates:
#須含order引數名,例如:curl http://localhost:9066/order/createorder/1?order=xxx
            - Query=order
#須含order引數名,且引數值滿足正則(以xx開頭且長度為3位的字串)       
            - Query=order,xx.
            #須header中含指定屬性名和滿足正則的屬性值
            - Header=X-Request-Id,\d+
            #須Cookie中含指定屬性名和滿足正則的屬性值
            - Cookie=sessionId,test
            #指定請求方式
            - Method=GET,PUT
            #滿足路徑格式
            - Path=/order/{segment}
            #在指定時區時間之後/前/中間才能匹配
            - After=2020-01-01T12:00:00+08:00[Asia/Shanghai]
            - Before=2020-01-01T12:00:00+08:00[Asia/Shanghai]
            - Between=2020-01-01T12:00:00+08:00[Asia/Shanghai],2021-01-01T12:00:00+08:00[Asia/Shanghai]

4.過濾器

4.1 內建過濾器使用

filter除了分為“pre”和“post”兩種方式的filter外,在Spring Cloud Gateway中,filter從作用範圍可分為另外兩種,一種是針對於單個路由的gateway filter,它在配置檔案中的寫法同predict類似;另外一種是針對於所有路由的global gateway filer。

Spring Cloud Gateway 內建的過濾器工廠一覽表如下:

原始碼位置:org.springframework.cloud.gateway.filter.factory

spring:
  cloud:
    gateway:
      routes:
        - id: add_request_header_route
          uri: http://httpbin.org:80/get
          predicates:
            - Path=/get
          filters:
          	#會在請求頭加上一對請求頭,名稱為X-Request-Foo,值為Bar
            - AddRequestHeader=X-Request-Foo,Bar
            #會在響應頭加上一對請求頭,名稱為X-Request-Foo,值為Bar
            - AddResponseHeader=X-Request-Foo,Bar
            #重寫路徑:localhost:9066/user/order/createorder/1->localhost:8083/order/createorder/1
            - RewritePath=/user/order/(?<segment>.*), /order/$\{segment}

4.2 自定義過濾器

實現一個過濾器工廠,在列印日誌的時候,可以設定引數來決定是否列印請求引數.

過濾器工廠的頂級介面是GatewayFilterFactory,我們可以直接繼承它的兩個抽象類來簡化開發AbstractGatewayFilterFactory和AbstractNameValueGatewayFilterFactory,這兩個抽象類的區別就是前者接收一個引數(像StripPrefix和我們建立的這種),後者接收兩個引數(像AddResponseHeader)。

現在需要將請求的日誌打印出來,需要使用一個引數,這時可以參照RedirectToGatewayFilterFactory的寫法:

public class RequestTimeGatewayFilterFactory extends AbstractGatewayFilterFactory<RequestTimeGatewayFilterFactory.Config> {

    private static final Logger log = LoggerFactory.getLogger(RequestTimeGatewayFilterFactory.class);
    private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";
    private static final String KEY = "withParams";

    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList(KEY);
    }
		//在類的構造器中一定要呼叫下父類的構造器把Config型別傳過去,否則會報ClassCastException
    public RequestTimeGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());
            return chain.filter(exchange).then(
                    Mono.fromRunnable(() -> {
                        Long startTime = exchange.getAttribute(REQUEST_TIME_BEGIN);
                        if (startTime != null) {
                            StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())
                                    .append(": ")
                                    .append(System.currentTimeMillis() - startTime)
                                    .append("ms");
                            //判斷是否列印引數
                            if (config.isWithParams()) {
                                sb.append(" params:").append(exchange.getRequest().getQueryParams());
                            }
                            log.info(sb.toString());
                        }
                    })
            );
        };
    }


    public static class Config {
        private boolean withParams;
        public boolean isWithParams() {
            return withParams;
        }
        public void setWithParams(boolean withParams) {
            this.withParams = withParams;
        }

    }
}

在上面的程式碼中 apply(Config config)方法內建立了一個GatewayFilter的匿名類,加了是否列印請求引數的邏輯,而這個邏輯的開關是config.isWithParams()。

靜態內部類類Config就是為了接收那個boolean型別的引數服務的,裡邊的變數名可以隨意寫,但是要重寫List shortcutFieldOrder()這個方法。

需要注意的是,在類的構造器中一定要呼叫下父類的構造器把Config型別傳過去,否則會報ClassCastException

注入實現類

最後,需要在工程的啟動檔案Application類中,向Srping Ioc容器註冊RequestTimeGatewayFilterFactory類的Bean。

@Bean
public RequestTimeGatewayFilterFactory elapsedGatewayFilterFactory() {
    return new RequestTimeGatewayFilterFactory();
}

配置檔案配置

spring:
  cloud:
    gateway:
      routes:
        - id: order-server
          uri: http://localhost:8083/
          predicates:
            - Path=/order/**
          filters:
            - RequestTime=false

4.3 全域性過濾器

區別

  • GatewayFilter 需要通過spring.cloud.routes.filters 配置在具體路由下,只作用在當前路由上,或通過spring.cloud.default-filters配置在全域性,作用在所有路由上

  • GlobalFilter 全域性過濾器,不需要在配置檔案中配置,作用在所有的路由上,最終通過GatewayFilterAdapter包裝成GatewayFilterChain可識別的過濾器,它為請求業務以及路由的URI轉換為真實業務服務的請求地址的核心過濾器,不需要配置,系統初始化時載入,並作用在每個路由上。

內建GlobalFilter

上圖中每一個GlobalFilter都作用在每一個router上,能夠滿足大多數的需求。但是如果遇到業務上的定製,可能需要編寫滿足自己需求的GlobalFilter。

自定義TokenFilter

在下面的案例中將講述如何編寫自己GlobalFilter,該GlobalFilter會校驗請求中是否包含了請求引數“token”,如果不包含請求引數“token”則不轉發路由 . 程式碼如下:

複製public class TokenFilter implements GlobalFilter, Ordered {

    Logger logger=LoggerFactory.getLogger( TokenFilter.class );
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      	//從request中獲取token
        String token = exchange.getRequest().getQueryParams().getFirst("token");
        if (token == null || token.isEmpty()) {
            logger.info( "token is empty..." );
          	//如果token不存在,終止轉發,返回錯誤碼未認證
            exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return exchange.getResponse().setComplete();
        }
      	//如果token存在,校驗通過
        return chain.filter(exchange);
    }
		//設定優先順序,數字越小優先順序越高
    @Override
    public int getOrder() {
        return -100;
    }
}

加入Spring容器

然後需要將TokenFilter在工程的啟動類中注入到Spring Ioc容器中,程式碼如下:

@Bean
public TokenFilter tokenFilter() {
    return new TokenFilter();
}

5.高階功能

5.1 熔斷降級

依賴:

Spring Cloud Gateway可以利用Hystrix的熔斷特性,在流量過大時進行服務降級,同時專案中必須加上Hystrix的依賴。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

配置檔案:

spring:
  cloud:
    gateway:
      routes:
        - id: order-server
          uri: http://localhost:8083/
          predicates:
            - Path=/order/**
          filters:
          #過濾器Hystrix,作用是通過Hystrix進行熔斷降級,當上遊的請求,進入了Hystrix熔斷降級機制時,就會呼叫fallbackUri配置的降級地址。需要注意的是,還需要單獨設定Hystrix的commandKey的超時時間
            - name: Hystrix
              args:
                name: fallbackcmd
                fallbackUri: forward:/fallback


hystrix:
  command:
    fallbackcmd:
      execution:
        isolation:
          thread:
            #超時時間,若不設定超時時間則有可能無法觸發熔斷
            timeoutInMilliseconds: 5000

降級Controller

上述配置中給出了熔斷之後返回路徑,因此,在Gateway服務模組新增/fallback路徑,以作為服務熔斷時的返回路徑。

@RestController
public class GatewayController {
    @GetMapping("fallback")
    public Map fallback() {
        Map<String, String> response = new HashMap<>();
        response.put("code", "500");
        response.put("message", "服務暫時不可用");
        return response;
    }
}

測試

將被呼叫的服務進行休眠5S用來測試熔斷,然後呼叫介面進行測試

5.2 重試路由器

通過簡單的配置,Spring Cloud Gateway就可以支援請求重試功能,但是被呼叫服務需要做好冪等性處理,重試需要慎用。

配置檔案配置

複製spring:
  cloud:
    gateway:
      routes:
        - id: order-server
          uri: http://localhost:8083/
          predicates:
            - Path=/order/**
          filters:
            - name: Retry
              args:
                retries: 3
                status: 500

Retry GatewayFilter通過四個引數來控制重試機制,引數說明如下:

  • retries:重試次數,預設值是 3 次。
  • status:HTTP 的狀態返回碼,取值請參考:org.springframework.http.HttpStatus。
  • methods:指定哪些方法的請求需要進行重試邏輯,預設值是 GET 方法,取值參考:org.springframework.http.HttpMethod。
  • series:一些列的狀態碼配置,取值參考:org.springframework.http.HttpStatus.Series。符合的某段狀態碼才會進行重試邏輯,預設值是 SERVER_ERROR,值是 5,也就是 5XX(5 開頭的狀態碼),共有5個值。

5.3 分散式限流

在高併發的系統中,往往需要在系統中做限流,一方面是為了防止大量的請求使伺服器過載,導致服務不可用,另一方面是為了防止網路攻擊。

常見的限流方式,比如Hystrix是用執行緒池隔離,超過執行緒池的負載,走熔斷的邏輯。在一般應用伺服器中,比如tomcat容器也是通過限制它的執行緒數來控制併發的;也有通過時間視窗的平均速度來控制流量。常見的限流緯度有比如通過Ip來限流、通過uri來限流、通過使用者訪問頻次來限流。

一般限流都是在閘道器這一層做,比如Nginx、Openresty、kong、zuul、Spring Cloud Gateway等;也可以在應用層通過Aop這種方式去做限流。

常見的限流演算法

計數器演算法

計數器演算法採用計數器實現限流有點簡單粗暴,一般我們會限制一秒鐘的能夠通過的請求數,比如限流qps為100,演算法的實現思路就是從第一個請求進來開始計時,在接下去的1s內,每來一個請求,就把計數加1,如果累加的數字達到了100,那麼後續的請求就會被全部拒絕。等到1s結束後,把計數恢復成0,重新開始計數。具體的實現可以是這樣的:對於每次服務呼叫,可以通過AtomicLong#incrementAndGet()方法來給計數器加1並返回最新值,通過這個最新值和閾值進行比較。這種實現方式,相信大家都知道有一個弊端:如果我在單位時間1s內的前10ms,已經通過了100個請求,那後面的990ms,只能眼巴巴的把請求拒絕,我們把這種現象稱為“突刺現象”

漏桶演算法

漏桶演算法為了消除”突刺現象”,可以採用漏桶演算法實現限流,漏桶演算法這個名字就很形象,演算法內部有一個容器,類似生活用到的漏斗,當請求進來時,相當於水倒入漏斗,然後從下端小口慢慢勻速的流出。不管上面流量多大,下面流出的速度始終保持不變。不管服務呼叫方多麼不穩定,通過漏桶演算法進行限流,每10毫秒處理一次請求。因為處理的速度是固定的,請求進來的速度是未知的,可能突然進來很多請求,沒來得及處理的請求就先放在桶裡,既然是個桶,肯定是有容量上限,如果桶滿了,那麼新進來的請求就丟棄。

在演算法實現方面,可以準備一個佇列,用來儲存請求,另外通過一個執行緒池(ScheduledExecutorService)來定期從佇列中獲取請求並執行,可以一次性獲取多個併發執行。

這種演算法,在使用過後也存在弊端:無法應對短時間的突發流量。

令牌桶演算法

從某種意義上講,令牌桶演算法是對漏桶演算法的一種改進,桶演算法能夠限制請求呼叫的速率,而令牌桶演算法能夠在限制呼叫的平均速率的同時還允許一定程度的突發呼叫。在令牌桶演算法中,存在一個桶,用來存放固定數量的令牌。演算法中存在一種機制,以一定的速率往桶中放令牌。每次請求呼叫需要先獲取令牌,只有拿到令牌,才有機會繼續執行,否則選擇選擇等待可用的令牌、或者直接拒絕。放令牌這個動作是持續不斷的進行,如果桶中令牌數達到上限,就丟棄令牌,所以就存在這種情況,桶中一直有大量的可用令牌,這時進來的請求就可以直接拿到令牌執行,比如設定qps為100,那麼限流器初始化完成一秒後,桶中就已經有100個令牌了,這時服務還沒完全啟動好,等啟動完成對外提供服務時,該限流器可以抵擋瞬時的100個請求。所以,只有桶中沒有令牌時,請求才會進行等待,最後相當於以一定的速率執行。

Spring Cloud Gateway限流

在Spring Cloud Gateway中,有Filter過濾器,因此可以在“pre”型別的Filter中自行實現上述三種過濾器。但是限流作為閘道器最基本的功能,Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory這個類,適用在Redis內的通過執行Lua指令碼實現了令牌桶的方式。具體實現邏輯在RequestRateLimiterGatewayFilterFactory類中,lua指令碼在如下圖所示的資料夾中:

引入POM依賴

Spring Cloud Gateway本身集成了限流操作,Gateway限流需要使用Redis,pom檔案中新增Redis依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
配置檔案配置
spring:
  redis:
    host: 192.168.64.128
    port: 6379
    database: 0
  cloud:
    gateway:
      routes:
        - id: order-server
          uri: http://localhost:8083/
          predicates:
            - Path=/order/**
          filters:
          	#限流過濾器,該過濾器需要配置三個引數.name必須是RequestRateLimiter。
            - name: RequestRateLimiter
              args:
                #用於限流的解析器的Bean物件的名字。它使用SpEL表示式#{@beanName}從Spring容器中獲取bean物件。
                key-resolver: "#{@hostKeyResolver}"
                #令牌通每秒填充平均速率
                redis-rate-limiter.replenishRate: 1
                #令牌桶的總容量
                redis-rate-limiter.burstCapacity: 3
key-resolver實現

Key-resolver引數後面的bean需要自己實現,然後注入到Spring容器中。

使用者ID限流

這裡根據使用者ID限流,請求路徑中必須攜帶userId引數

@Bean
public KeyResolver userKeyResolver() {
    return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
}

KeyResolver需要實現resolve方法,比如根據userid進行限流,則需要用userid去判斷。實現完KeyResolver之後,需要將這個類的Bean註冊到Ioc容器中。

根據IP限流

如果需要根據IP限流,定義的獲取限流Key的bean為:

@Bean
public KeyResolver hostKeyResolver() {
    return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
根據Path限流

還可以根據請求路徑進行限流

@Bean
public KeyResolver apiKeyResolver() {
    return exchange -> Mono.just(exchange.getRequest().getPath().value());
}