1. 程式人生 > 其它 >Sentinel原始碼分析二 之 流控演算法-滑動時間視窗

Sentinel原始碼分析二 之 流控演算法-滑動時間視窗

記錄Sentinel中使用的滑動時間視窗之前,先說明下簡單的滑動時間視窗是怎樣的。

對於限流的演算法假如時間視窗不滑動,限流qps 100。如果在0.5~1s之間發生了80個請求量,會認為在0~1s內qps是不會達到限流閾值的。在1~1.5s內達到80的請求量,也會認為在1-2s內的qps為80沒有達到閾值。

但是在0.5~1.5秒的請求量卻達到160已經超過了閾值。這就出現了問題。

 有了滑動時間時間視窗之後的統計變成下面的樣子。把1s分成四個bucket,每個是250ms間隔。

假如750ms~1s之間,來了一個請求,統計當前bucket和前面三個bucket中的請求量總和101,大於閾值,就會把當前這個請求進行限流。

假如1s~1.25s之間,來了一個請求,統計當前bucket和前面三個bucket中的請求量總和21,小於閾值,就會正常放行。這裡請求總量統計去掉了0~250ms之間的bucket,就是體現了時間視窗的滑動。

但是這裡需要注意的一點,時間滑動時間視窗的統計也不是百分百精準的,比如上圖中在綠色箭頭的地方來了一個請求之後,統計的bucket是250ms 到綠色箭頭之間的bucket,但是這個時間區間並沒有1s,

因為去掉了0~250ms的bucket統計,所以可能出現偏差,當然這個bucket區間細分的越小就越精準。


 下面分析Sentinel中怎麼利用滑動時間視窗進行流控的。

 Metric :是一個記錄保護資源的呼叫指標的基礎結構。裡面規定了被保護的資源有那些呼叫指標需要被統計。看一些關鍵指標。

原始碼:

public interface Metric extends DebugSupport {

    /**
     * Get total success count.
     * 統計所有成功的次數
     * @return success count
     */
    long success();

    /**
     * Get max success count.
     *
     * @return max success count
     */
    long maxSuccess();

    /**
     * Get total exception count.
     * 統計所有異常的次數
     * 
@return exception count */ long exception(); /** * Get total block count. * * @return block count */ long block(); /** * Get total pass count. not include {@link #occupiedPass()} * 統計請求通過的次數 不包括 occupiedPass 限流就是使用這個統計的 * @return pass count */ long pass(); /** * Get total response time. * * @return total RT */ long rt(); /** * Get the minimal RT. * * @return minimal RT */ long minRt(); /** * Get aggregated metric nodes of all resources. * * @return metric node list of all resources */ List<MetricNode> details(); /** * Generate aggregated metric items that satisfies the time predicate. * * @param timePredicate time predicate * @return aggregated metric items * @since 1.7.0 */ List<MetricNode> detailsOnCondition(Predicate<Long> timePredicate); /** * Get the raw window array. * * @return window metric array */ MetricBucket[] windows(); /** * Add current exception count. * * @param n count to add */ void addException(int n); /** * Add current block count. * * @param n count to add */ void addBlock(int n); /** * Add current completed count. * * @param n count to add */ void addSuccess(int n); /** * Add current pass count. * 增加一個請求通過的次數 * @param n count to add */ void addPass(int n); /** * Add given RT to current total RT. * * @param rt RT */ void addRT(long rt); /** * Get the sliding window length in seconds. * * @return the sliding window length */ double getWindowIntervalInSec(); /** * Get sample count of the sliding window. * * @return sample count of the sliding window. */ int getSampleCount(); /** * Note: this operation will not perform refreshing, so will not generate new buckets. * * @param timeMillis valid time in ms * @return pass count of the bucket exactly associated to provided timestamp, or 0 if the timestamp is invalid * @since 1.5.0 */ long getWindowPass(long timeMillis); // Occupy-based (@since 1.5.0) /** * Add occupied pass, which represents pass requests that borrow the latter windows' token. * * @param acquireCount tokens count. * @since 1.5.0 */ void addOccupiedPass(int acquireCount); /** * Add request that occupied. * * @param futureTime future timestamp that the acquireCount should be added on. * @param acquireCount tokens count. * @since 1.5.0 */ void addWaiting(long futureTime, int acquireCount); /** * Get waiting pass account * * @return waiting pass count * @since 1.5.0 */ long waiting(); /** * Get occupied pass count. * * @return occupied pass count * @since 1.5.0 */ long occupiedPass(); // Tool methods. long previousWindowBlock(); long previousWindowPass(); }
View Code

 ArrayMetric 是上面介面的一個實現類。它維護了一個區域性變數 LeapArray<MetricBucket> data 來具體計算被保護資源的呼叫指標.

部分原始碼如下:

public class ArrayMetric implements Metric {
    // 滑動時間視窗演算法的實現 
    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
// 使用的這個具體類,sampleCount :時間視窗的bucket數量 預設 2個, intervalInMs: 時間視窗的時間間隔 預設 1000ms 也就是每個bucket是500ms
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } @Override public long pass() {
// 更新最新的時間視窗 data.currentWindow();
long pass = 0;
// 得到所有的統計視窗 List
<MetricBucket> list = data.values(); // 每個視窗中的統計量累加起來 for (MetricBucket window : list) { pass += window.pass(); } return pass; } @Override public void addPass(int count) {
// 向當前時間視窗中增加一個請求數量 這個方法會在StatisticSlot 統計qps的時候使用到
// 時間視窗每個bucket都被WindowWrap包裝了下,而且一個MetricBucket 裡面可以統計好多維度的資料,使用MetricEvent區分的。 WindowWrap
<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); } public void add(MetricEvent event, long count) { data.currentWindow().value().add(event, count); } @Override public double getWindowIntervalInSec() { return data.getIntervalInSecond(); } @Override public int getSampleCount() { return data.getSampleCount(); } }

LeapArray: 

public abstract class LeapArray<T> {
    
// 滑動時間視窗每個bucket的時間長度
protected int windowLengthInMs;
// 滑動時間視窗 一共有多少個bucket
protected int sampleCount;
// 滑動時間視窗 總的時間視窗 單位 毫秒
protected int intervalInMs;
// 滑動時間視窗 總的時間視窗 單位 秒
private double intervalInSecond;
// 每個時間視窗bucket 的儲存例項WindowWrap array 就相當於是整個滑動時間視窗
protected final AtomicReferenceArray<WindowWrap<T>> array; /** * The conditional (predicate) update lock is used only when current bucket is deprecated.
更新滑動時間視窗的時候使用
*/ private final ReentrantLock updateLock = new ReentrantLock(); /** * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}. * * @param sampleCount bucket count of the sliding window * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds */ public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.intervalInSecond = intervalInMs / 1000.0; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } /** * Get the bucket at current timestamp. * 計算當前時間的bucket例項 每次請求過來都會計算處於那個bucket位置 * @return the bucket at current timestamp */ public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis()); } /** * Create a new statistic value for bucket. * * @param timeMillis current time in milliseconds * @return the new empty bucket */ public abstract T newEmptyBucket(long timeMillis); /** * Reset given bucket to provided start time and reset the value. * * @param startTime the start time of the bucket in milliseconds * @param windowWrap current bucket * @return new clean bucket at given start time */ protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
// 計算當前時間處於滑動時間視窗陣列中的索引位置
private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); } // 計算時間視窗bucket的起始時間 protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } /** * Get bucket item at provided timestamp. * * @param timeMillis a valid timestamp in milliseconds * @return current bucket item at provided timestamp if the time is valid; null if time is invalid */ public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 計算當前時間 在滑動時間視窗array中的索引位置 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time.
// 計算當前時間在時間視窗bucket中的開始時間
long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) {
// 根據當前時間計算的bucket 索引值 在array的資料 WindowWrap
<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice.
這個圖解就很好理解了,比如當前時間計算的bucket 所在的位置在上面的800~1000之間的時候,array是空的,就新建一個時間視窗bucket WindowWrap
通過cas更新到array中 如果cas失敗了就讓出時間片
*/ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) {
/* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket.
如果當前時間計算出來的索引位置已經有了WindowWrap bucket 而且存在的bucket的開始時間和當前計算的開始相等,就返回已經存在的這個WindowWrap
在StatisticSlot增加請求數量的時候就會使用這個bucket 中的請求數量進行累加
*/ return old; } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss.
這個時候就表明 時間視窗要向前滑動了 就是把存在的時間視窗內容進行重置 重置包括開始時間更新 視窗內的計數清零
使用加鎖操作
*/ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket.
// 具體邏輯在的子類 OccuiableBucketLeapArray中
return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } public List<T> values() { return values(TimeUtil.currentTimeMillis()); } public List<T> values(long timeMillis) {
// 把當前時間視窗中的bucket WindowWrap 都返回出去 用來統計時間視窗總的請求數量
if (timeMillis < 0) { return new ArrayList<T>(); } int size = array.length(); List<T> result = new ArrayList<T>(size); for (int i = 0; i < size; i++) { WindowWrap<T> windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { continue; } result.add(windowWrap.value()); } return result; }

 上面滑動時間視窗用了WindowWrap 來表示bucket,但是這個類裡又包了一個泛型,使用泛型的目的是為了讓這個bucket 可以統計更多型別的資料。比如上面的泛型是MetricBucket。

 WindowWrap :

 MetricBucket:

public class MetricBucket {

   // 可以儲存自己想統計的資料維度 LongAdder是比Atomic類效能更高的類
    private final LongAdder[] counters;

    private volatile long minRt;

    public MetricBucket() {
       // 用一個列舉值來記錄想統計的資料維度
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
           // 資料中不同的位置表示不同的資料維度
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
    }

    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }

    /**
     * Reset the adders.
     *
     * @return new metric bucket in initial state
     */
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }

    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long occupiedPass() {
        return get(MetricEvent.OCCUPIED_PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    public long exception() {
        return get(MetricEvent.EXCEPTION);
    }

    public long rt() {
        return get(MetricEvent.RT);
    }

    public long minRt() {
        return minRt;
    }

    public long success() {
        return get(MetricEvent.SUCCESS);
    }

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public void addOccupiedPass(int n) {
        add(MetricEvent.OCCUPIED_PASS, n);
    }

    public void addException(int n) {
        add(MetricEvent.EXCEPTION, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    public void addSuccess(int n) {
        add(MetricEvent.SUCCESS, n);
    }

    public void addRT(long rt) {
        add(MetricEvent.RT, rt);

        // Not thread-safe, but it's okay.
        if (rt < minRt) {
            minRt = rt;
        }
    }

    @Override
    public String toString() {
        return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass();
    }
}
View Code

統計的維度列舉類:MetricEvent

public enum MetricEvent {

    /**
     * Normal pass.
     */
    PASS,
    /**
     * Normal block.
     */
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS
}

 Sentinel中 qps的統計和限流控制主要是在StatisticSlot,FlowSlot裡面完成的。

 先執行StatisticSlot的entry.   fireEntry就是執行下一個slot的。從這裡看出,在正向執行的時候它並沒有處理什麼內容。但是在下面責任鏈上的entry方法執行完後。

呼叫了node.increaseThreadNum(); node.addPassRequest(count);  這裡就是統計請求通過的執行緒數量和請求數量。

 我們這裡跟蹤下node.addPassRequest(count);  用到了時間滑動視窗。node是DefaultNode例項。

DefaultNode#addPassRequest(count)

@Override
    public void addPassRequest(int count) {
//呼叫父類 StatisticNode
super.addPassRequest(count);
// 叢集模式下使用的
this.clusterNode.addPassRequest(count); }

 這裡看單機限流模式,跟隨StatisticNode#addPassRequest(count);

 @Override
    public void addPassRequest(int count) {
// 秒級別的滑動時間視窗 看這個就可以了 rollingCounterInSecond.addPass(count);
// 分鐘級別的滑動時間視窗 rollingCounterInMinute.addPass(count); }

秒級別的滑動時間視窗:

 
// 兩個引數分別是:2 1000 這裡就是構建了一個時間滑動視窗長度 1s,分了兩個bucket,一個bucket長度是500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);

ArrayMetric的部分原始碼上面有貼出來。 

rollingCounterInSecond.addPass(count);  這個是呼叫下面這端邏輯。

  @Override
    public void addPass(int count) {
// 首先獲取當前時間的時間視窗,然後向視窗中增加請求量。
// 向當前時間視窗中增加一個請求數量 這個方法會在StatisticSlot 統計qps的時候使用到 // 時間視窗每個bucket都被WindowWrap包裝了下,而且一個MetricBucket 裡面可以統計好多維度的資料,使用MetricEvent區分的。 WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }data.currentWindow();

data.currentWindow()會呼叫到LeapArray#currentWindow(long timeMills);的方法上面原始碼也有分析。

得到視窗之後  wrap.value().addPass(count);  會呼叫MetricBucket#appPass(int n); 方法

 public void addPass(int n) {
// 時間視窗bucket 中也是有一個 LongAdder[] counter來儲存計數統計的,因為可以統計不同維度的資料,比如這裡就是統計PASS的數量 add(MetricEvent.PASS, n); }

至於為什麼使用LongAdder,因為它的效率比Atomic類的效能更好些。至於為什麼就不說了這裡。

 public MetricBucket add(MetricEvent event, long n) {
// event.ordinal() 就是獲取列舉值在列舉類中的位置,就是索引值 這個操作是cas的 效能更好些。 counters[event.ordinal()].add(n);
return this; }

上面分析的就是StatisticSlot 統計qps的過程。

下面看下FlowSlot中怎麼做流控限制的。

FlowSlot#entry中

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
// 根據配置的限流規則進行校驗 checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); }

FlowSlot#checkFlow

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
// checker 是 FlowRuleChecker checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }

FlowRuleChecker#checkFlow

 public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
// 開始呼叫配置的限流規則
if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
         // 不同的流控效果,對應不同的rater 這裡主要看快速失敗DefaultController
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

流控效果分為下面三種。分別對應:DefaultController,WarmUpController,RateLimiterController

 DefaultController@canPass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 計算時間視窗內已經通過的請求數量
int curCount = avgUsedTokens(node);
// acquireCount一般就是 1 ,count就是配置的單機閾值 如果條件滿足就是達到閾值 直接返回false
if (curCount + acquireCount > count) {
// prioritized 預設是false
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }

如果通過,說明沒有達到閾值,然後走到StatisticSlot邏輯的時候進行addPass。這樣整個流程就串起來了。