Sentinel限流之快速失敗和漏桶演算法
阿新 • • 發佈:2021-01-12
> 距離上次總結Sentinel的滑動視窗演算法已經有些時間了,原本想著一口氣將它的core模組全部總結完,但是中間一懶就又鬆懈下來了,這幾天在工作之餘又重新整理了一下,在這裡做一個學習總結。
>
> 上篇滑動視窗演算法總結連結:https://www.cnblogs.com/mrxiaobai-wen/p/14212637.html
>
> 今天主要總結了一下Sentinel的快速失敗和勻速排隊的漏桶演算法。因為它的WarmUpController和WarmUpRateLimiterController對應的令牌桶演算法的數學計算原理有一點點複雜,所以我準備在後面單獨用一篇來總結。所以今天涉及到的主要就是DefaultController和RateLimiterController。
***
## 限流策略入口
首先進入到FlowRuleUtil類中,方法generateRater就是對應策略的建立,邏輯比較簡單,程式碼如下:
~~~java
private static TrafficShapingController generateRater(FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
// WarmUp-令牌桶演算法
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
// 排隊等待-漏桶演算法
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
// 預熱和勻速排隊結合
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 快速失敗
return new DefaultController(rule.getCount(), rule.getGrade());
}
~~~
***
## 快速失敗DefaultController
預設流控演算法程式碼如下:
~~~java
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
// 當前閾值 + acquireCount 是否大於規則設定的count,小於等於則表示符合閾值設定直接返回true
if (curCount + acquireCount > count) {
// 在大於的情況下,針對QPS的情況會對先進來的請求進行特殊處理
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 如果策略是QPS,那麼對於優先請求嘗試去佔用下一個時間視窗中的令牌
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
~~~
先看一下涉及到的avgUsedTokens方法:
~~~java
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 獲取當前qps或者當前執行緒數
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
~~~
主要是獲取已使用的令牌數,如果設定的閾值型別為執行緒數,那麼返回當前統計節點中儲存的執行緒數,如果設定的閾值型別為QPS,那麼返回已經通過的QPS數。
然後回到上面的canPass方法,其主要邏輯就是在獲取到目前節點的統計資料後,將已佔用的令牌數與請求的令牌數相加,如果小於設定的閾值,那麼直接放行。
如果大於設定的閾值,那麼在閾值型別為QPS且允許優先處理先到的請求的情況下進行特殊處理,否則返回false不放行。
上面特殊處理就是:首先嚐試去佔用後面的時間視窗的令牌,獲取到等待時間,如果等待時間小於設定的最長等待時長,那麼就進行等待,當等待到指定時間後返回。否則直接返回false不放行。
由程式碼可以看出,在等待指定時長後,丟擲PriorityWaitException進行放行,對應實現的地方在StatisticSlot中,對應entry方法程式碼如下:
~~~java
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 說明:省略了執行通過的處理邏輯
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
for (ProcessorSlotEntryCallback