1. 程式人生 > >sentinel 叢集流控原理

sentinel 叢集流控原理

為什麼需要叢集流控呢?假設需要將某個API的總qps限制在100,機器數可能為50,這時很自然的想到使用一個專門的server來統計總的呼叫量,其他例項與該server通訊來判斷是否可以呼叫,這就是基本的叢集流控方式,sentinel的實現就是這樣的。

如果服務呼叫使用輪訓或者隨機路由方式,理論上可以通過在各個單機上設定流控規則即可(單機qps上限=總qps上限 / 機器數)。叢集流控可以解決流量分配不均的問題導致總體流控效果不佳的問題,其可以精確地控制整個叢集的呼叫總量,結合單機限流兜底,可以更好地發揮流量控制的效果,不過由於會與server進行通訊,所以效能上會有一定損耗。

叢集流控中共有兩種身份:

  • Token Client:叢集流控客戶端,用於向所屬 Token Server 通訊請求 token。叢集限流服務端會返回給客戶端結果,決定是否限流。
  • Token Server:即叢集流控服務端,處理來自 Token Client 的請求,根據配置的叢集規則判斷是否應該發放 token(是否允許通過)。

Sentinel 1.4.0 開始引入了叢集流控模組,主要包含以下幾部分:

  • sentinel-cluster-common-default: 公共模組,包含公共介面和實體
  • sentinel-cluster-client-default: 預設叢集流控 client 模組,使用 Netty 進行通訊,提供介面方便序列化協議擴充套件
  • sentinel-cluster-server-default: 預設叢集流控 server 模組,使用 Netty 進行通訊,提供介面方便序列化協議擴充套件;同時提供擴充套件介面對接規則判斷的具體實現(TokenService),預設實現是複用 sentinel-core 的相關邏輯

 

大致瞭解叢集流控概念之後,下面一起分析下叢集流控規則、client端和server端各自處理機制~

叢集流控規則

FlowRule 添加了兩個欄位用於叢集限流相關配置,如下所示。clusterMode在方法FlowRuleChecker.canPassCheck中會用到進行判斷是否是叢集流控,false表示單機流控;true表示叢集流控,會呼叫方法passClusterCheck與叢集流控server端通訊判斷是否觸發了流控,此時異常降級策略為本地流控(fallbackToLocalOrPass方法,fallbackToLocalWhenFail屬性為true時執行本地流控,否則直接返回ture不走流控檢查

)。

 1 private boolean clusterMode; // 標識是否為叢集限流配置
 2 private ClusterFlowConfig clusterConfig; // 叢集限流相關配置項
 3 ​
 4 // ClusterFlowConfig屬性
 5 private Long flowId; // 全域性唯一的規則 ID,由叢集限流管控端分配.
 6 private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; // 閾值模式,預設(0)為單機均攤,1 為全域性閾值.
 7 private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
 8 private boolean fallbackToLocalWhenFail = true; // 在 client 連線失敗或通訊失敗時,是否退化到本地的限流模式
 9 ​
10 public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
11     String limitApp = rule.getLimitApp();
12     if (limitApp == null) {
13         return true;
14     }
15     if (rule.isClusterMode()) {// 叢集模式
16         return passClusterCheck(rule, context, node, acquireCount, prioritized);
17     }
18     // 單機模式流控
19     return passLocalCheck(rule, context, node, acquireCount, prioritized);
20 }
  • flowId 代表全域性唯一的規則 ID,Sentinel 叢集限流服務端通過此 ID 來區分各個規則,因此務必保持全域性唯一。一般 flowId 由統一的管控端進行分配,或寫入至 DB 時生成。
  • thresholdType 代表叢集限流閾值模式。單機均攤模式表示總qps閾值等於機器數*單機qps閾值;全域性閾值等於整個叢集配置的閾值。
  • strategy 叢集策略,預設FLOW_CLUSTER_STRATEGY_NORMAL,針對ClusterFlowConfig配置該屬性為FLOW_CLUSTER_STRATEGY_NORMAL才合法,除此之外,暫無太多業務意義。

client端處理機制

client端的處理機制和單機是一樣的,只不過clusterMode和clusterConfig屬性配置上了而已,具體的client使用可以參考官方文件 叢集流控,這裡不再贅述。如果是叢集流控,在FlowRuleChecker.canPassCheck方法中會呼叫方法passClusterCheck,如下:

 1 private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
 2                                             boolean prioritized) {
 3     try {
 4         TokenService clusterService = pickClusterService();
 5         if (clusterService == null) {
 6             // 為null降級處理
 7             return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
 8         }
 9         long flowId = rule.getClusterConfig().getFlowId();
10         TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
11         return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
12     } catch (Throwable ex) {
13         RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
14     }
15     // 降級處理 本地限流
16     return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
17 }

requestToken負責與token server端通訊,入參包括flowId, acquireCount, prioritized,這裡是沒有Resource資訊的,server端通過flowid來獲取對應規則進行流控判斷。注意,呼叫writeAndFlush傳送請求之後等待響應結果,最大等待時間ClusterClientConfigManager.getRequestTimeout();請求傳送過程中,出現任何異常或者返回錯誤(這裡不包括BLOCKED情況),都會預設走降級本地流控邏輯:fallbackToLocalOrPass

瞭解了client端處理流程,接下來看下server端處理流程,client和server端都是用netty作為底層網路通訊服務,關於netty的原理不是本文討論的重點因此會簡單帶過。如果小夥伴們還不太熟悉netty,請參閱對應資料即可。對於netty,每個Java開發者都需要了解甚至是熟悉的,這樣不僅僅幫助我們理解NIO及Reactor模型,還能再閱讀基於netty的框架原始碼(比如dubbo/rocketmq等)時,將重點關注在框架本身實現上,而不是網路通訊流程及細節上。

server端處理機制

Sentinel 叢集限流服務端有兩種啟動方式:

  • 獨立模式(Alone),即作為獨立的 token server 程序啟動,獨立部署,隔離性好,但是需要額外的部署操作。獨立模式適合作為 Global Rate Limiter 給叢集提供流控服務。

  • 嵌入模式(Embedded),即作為內建的 token server 與服務在同一程序中啟動。在此模式下,叢集中各個例項都是對等的,token server 和 client 可以隨時進行轉變,因此無需單獨部署,靈活性比較好。但是隔離性不佳,需要限制 token server 的總 QPS,防止影響應用本身。嵌入模式適合某個應用叢集內部的流控。

 

目前針對token server高可用,sentinel並沒有對應的解決方案,不過沒有並不意味著沒考慮,因為預設可以降級走本地流控。sentinel作為一個限流元件,在大部分應用場景中,如果token server掛了降級為本地流控就可以滿足了。

如果必須考慮token server高可用,可考慮token server叢集部署,每個token server都能訪問(或儲存)全量規則資料,多個client通過特定路由規則分配到不同的token server(相同型別服務路由到同一個token server,不同型別服務可路由到不同token server),token server故障時提供failover機制即可。如果此時考慮到相同型別服務出現網路分割槽,也就是一部分服務可以正常與token server通訊,另一個部分服務無法正常與token server通訊,如果無法正常通訊的這部分服務直接進行failover,會導致叢集限流不準的問題,可通過zookeeper來儲存線上的token server,如果zookeeper中token server列表有變化,再進行failover;此情況下再出現任何形式的網路分割槽,再執行降級邏輯,執行本地限流。

 

server端不管是獨立模式還是嵌入模式,都是通過NettyTransportServer來啟動的:

public void start() {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                p.addLast(new NettyRequestDecoder());
                p.addLast(new LengthFieldPrepender(2));
                p.addLast(new NettyResponseEncoder());
                p.addLast(new TokenServerHandler(connectionPool));
            }
        });
    b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
        //
    });
}

以上邏輯主要是netty啟動邏輯,重點關注initChannel方法,這些是往pipeline新增自定義channelHandler,主要是處理粘包、編解碼器和業務處理Handler,這裡最重要的是TokenServerHandler,因為是請求處理邏輯,所以重點關注其channelRead方法:

 1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 2     // 全域性儲存channel
 3     globalConnectionPool.refreshLastReadTime(ctx.channel());
 4     if (msg instanceof ClusterRequest) {
 5         ClusterRequest request = (ClusterRequest)msg;
 6         if (request.getType() == ClusterConstants.MSG_TYPE_PING) {
 7             // ping請求處理,會記錄namespace資訊
 8             handlePingRequest(ctx, request);
 9             return;
10         }
11         // 根據request type獲取對應處理器
12            // 針對叢集流控,type為MSG_TYPE_FLOW
13         RequestProcessor<?, ?> processor = RequestProcessorProvider.getProcessor(request.getType());
14         ClusterResponse<?> response = processor.processRequest(request);
15         writeResponse(ctx, response);
16     }
17 }

 

針對叢集流控,type為MSG_TYPE_FLOW,對應處理器為FlowRequestProcessor。首先會提取請求入參 flowId, acquireCount, prioritized,主要步驟如下:

  • 根據flowId獲取規則,為空返回結果NO_RULE_EXISTS;
  • 獲取請求namespace對應的RequestLimiter,非空時進行tryPass限流檢查,該檢查是針對namespace維度;
  • 針對flowId對應規則進行限流檢查,acquireCount表示該請求需要獲取的token數,資料檢查基於滑動時間視窗統計來判斷的。

根據限流規則檢查之後,會統計相關的PASS/BLOCK/PASS_REQUEST/BLOCK_REQUEST等資訊,該流程和單機流控流程是類似的,具體程式碼不再贅述。處理完成之後,會返回client端處理結果,至此整個叢集流控流程就分析完了。

 

往期精選

  • sentinel 核心概念
  • 你的Redis有類轉換異常麼
  • Redis 基礎資料結構
  • 別再問我ConcurrentHashMap了
  • 分散式鎖設計與實現
  • ConcurrentHashMap竟然也有死迴圈問題?
  • 你的ThreadLocal執行緒安全麼
  • MySQL基礎概念知多少

覺得文章不錯,對你有所啟發和幫助,希望能轉發給更多的小夥伴。如果有問題,請關注下面公眾號,傳送問題給我,多謝。

歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。