5.Sentinel原始碼分析—Sentinel如何實現自適應限流?
Sentinel原始碼解析系列:
1.Sentinel原始碼分析—FlowRuleManager載入規則做了什麼?
2. Sentinel原始碼分析—Sentinel是如何進行流量統計的?
3. Sentinel原始碼分析— QPS流量控制是如何實現的?
4.Sentinel原始碼分析— Sentinel是如何做到降級的?
這篇文章主要學習一下Sentinel如何實現自適應限流的。
為什麼要做自適應限流,官方給了兩個理由:
- 保證系統不被拖垮
- 在系統穩定的前提下,保持系統的吞吐量
我再貼一下官方的原理:
- 能夠保證水管裡的水量,能夠讓水順暢的流動,則不會增加排隊的請求;也就是說,這個時候的系統負載不會進一步惡化。
- 當保持入口的流量是水管出來的流量的最大的值的時候,可以最大利用水管的處理能力。
更加具體的原理解釋可以看官方:系統自適應限流
所以看起來好像很厲害的樣子,所以我們來看看具體實現吧。
例子:
- 設定系統自適應規則
List<SystemRule> rules = new ArrayList<SystemRule>(); SystemRule rule = new SystemRule(); //限制最大負載 rule.setHighestSystemLoad(3.0); // cpu負載60% rule.setHighestCpuUsage(0.6); // 設定平均響應時間 10 ms rule.setAvgRt(10); // 設定qps is 20 rule.setQps(20); // 設定最大執行緒數 10 rule.setMaxThread(10); rules.add(rule); SystemRuleManager.loadRules(Collections.singletonList(rule));
- 設定限流
Entry entry = null;
try {
entry = SphU.entry("methodA", EntryType.IN);
//dosomething
} catch (BlockException e1) {
block.incrementAndGet();
//dosomething
} catch (Exception e2) {
// biz exception
} finally {
if (entry != null) {
entry.exit();
}
}
注意:系統保護規則是應用整體維度的,而不是資源維度的,並且僅對入口流量生效。入口流量指的是進入應用的流量(EntryType.IN),比如 Web 服務或 Dubbo 服務端接收的請求,都屬於入口流量。
我們先講一下SystemRuleManager這個類在初始化的時候做了什麼吧。
SystemRuleManager
private static SystemStatusListener statusListener = null;
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-system-status-record-task", true));
static {
checkSystemStatus.set(false);
statusListener = new SystemStatusListener();
scheduler.scheduleAtFixedRate(statusListener, 5, 1, TimeUnit.SECONDS);
currentProperty.addListener(listener);
}
SystemRuleManager初始化的時候會呼叫靜態程式碼塊,然後用scheduler執行緒池定時呼叫SystemStatusListener類的run方法。我們進入到SystemStatusListener類裡看一下:
SystemStatusListener#run
public void run() {
try {
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
currentLoad = osBean.getSystemLoadAverage();
currentCpuUsage = osBean.getSystemCpuLoad();
StringBuilder sb = new StringBuilder();
if (currentLoad > SystemRuleManager.getHighestSystemLoad()) {
sb.append("load:").append(currentLoad).append(";");
sb.append("cpu:").append(currentCpuUsage).append(";");
sb.append("qps:").append(Constants.ENTRY_NODE.passQps()).append(";");
sb.append("rt:").append(Constants.ENTRY_NODE.avgRt()).append(";");
sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append(";");
sb.append("success:").append(Constants.ENTRY_NODE.successQps()).append(";");
sb.append("minRt:").append(Constants.ENTRY_NODE.minRt()).append(";");
sb.append("maxSuccess:").append(Constants.ENTRY_NODE.maxSuccessQps()).append(";");
RecordLog.info(sb.toString());
}
} catch (Throwable e) {
RecordLog.info("could not get system error ", e);
}
}
這個方法用來做兩件事:
- 定時收集全域性資源情況,並列印日誌
- 給全域性變數currentLoad和currentCpuUsage賦值,用來做限流使用。
然後看一下SystemRuleManager.loadRules方法。SystemRuleManager和其他的規則管理是一樣的,當呼叫loadRules方法的時候會呼叫內部的listener並觸發它的configUpdate方法。
在SystemRuleManager中實現類了一個SystemPropertyListener,最終SystemRuleManager.loadRules方法會呼叫到SystemPropertyListener的configUpdate中。
SystemPropertyListener#configUpdate
public void configUpdate(List<SystemRule> rules) {
restoreSetting();
// systemRules = rules;
if (rules != null && rules.size() >= 1) {
for (SystemRule rule : rules) {
loadSystemConf(rule);
}
} else {
checkSystemStatus.set(false);
}
RecordLog.info(String.format("[SystemRuleManager] Current system check status: %s, "
+ "highestSystemLoad: %e, "
+ "highestCpuUsage: %e, "
+ "maxRt: %d, "
+ "maxThread: %d, "
+ "maxQps: %e",
checkSystemStatus.get(),
highestSystemLoad,
highestCpuUsage,
maxRt,
maxThread,
qps));
}
這個方法很簡單,首先是呼叫restoreSetting,用來重置rule的屬性,然後遍歷rule呼叫loadSystemConf對規則進行設定:
SystemRuleManager#loadSystemConf
public static void loadSystemConf(SystemRule rule) {
boolean checkStatus = false;
// Check if it's valid.
if (rule.getHighestSystemLoad() >= 0) {
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
checkStatus = true;
}
if (rule.getHighestCpuUsage() >= 0) {
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
if (rule.getAvgRt() >= 0) {
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
if (rule.getMaxThread() >= 0) {
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
if (rule.getQps() >= 0) {
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}
這些屬性都是在限流控制中會用到的屬性,無論設定哪個屬性都會設定checkStatus=true表示開啟系統自適應限流。
在設定好限流規則後會進入到SphU.entry方法中,通過建立slot鏈呼叫到SystemSlot,這裡是系統自適應限流的地方。
SystemSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//檢查一下是否符合限流條件,符合則進行限流
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
SystemRuleManager#checkSystem
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
//如果不是入口流量,那麼直接返回
// for inbound traffic only
if (resourceWrapper.getType() != EntryType.IN) {
return;
}
// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
}
這個方法首先會校驗一下checkSystemStatus狀態和EntryType是不是IN,如果不是則直接返回。
然後對Constants.ENTRY_NODE進行操作。這個物件是一個final static 修飾的變數,代表是全域性物件。
public final static ClusterNode ENTRY_NODE = new ClusterNode();
所以這裡的限流操作都是對全域性其作用的,而不是對資源起作用。ClusterNode還是繼承自StatisticNode,所以最後都是呼叫StatisticNode的successQps、curThreadNum、avgRt,這幾個方法我的前幾篇文章都已經講過了,感興趣的可以自己去翻一下,這裡就不過多涉及了。
在下面呼叫getCurrentSystemAvgLoad方法和getCurrentCpuUsage方法呼叫到SystemStatusListener設定的全域性變數currentLoad和currentCpuUsage。這兩個引數是SystemRuleManager的定時任務定時收集的,忘了的同學回到上面講解SystemRuleManager的地方看一下。
在做load判斷和cpu usage判斷的時候會還會呼叫checkBbr方法來判斷:
private static boolean checkBbr(int currentThread) {
if (currentThread > 1 &&
currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
return false;
}
return true;
}
也就是說:當系統 load1 超過閾值,且系統當前的併發執行緒數超過系統容量時才會觸發系統保護。系統容量由系統的 maxQps * minRt 計算得出。
StatisticNode#maxSuccessQps
public double maxSuccessQps() {
return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount();
}
maxSuccessQps方法是用視窗內的最大成功呼叫數和視窗數量相乘rollingCounterInSecond的視窗1秒的視窗數量是2,最大成功呼叫數如下得出:
ArrayMetric#maxSuccess
public long maxSuccess() {
data.currentWindow();
long success = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
if (window.success() > success) {
success = window.success();
}
}
return Math.max(success, 1);
}
最大成功呼叫數是通過整個遍歷整個視窗,獲取所有窗口裡面最大的呼叫數。所以這樣的最大的併發量是一個預估值,不是真實值。
看到這裡我們再來看一下Constants.ENTRY_NODE的資訊是怎麼被收集的。
我在分析StatisticSlot這個類的時候有一段程式碼我當時也沒看懂有什麼用,現在就迎刃而解了:
StatisticSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
....
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
....
} catch (PriorityWaitException ex) {
....
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
....
} catch (BlockException e) {
....
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
....
throw e;
} catch (Throwable e) {
....
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
在StatisticSlot的entry方法裡有很多對於type的判斷,如果是EntryType.IN,那麼就呼叫Constants.ENTRY_NODE的靜態方法進行資料的收集。
所以看到這裡我們可以知道,在前面有很多看不懂的程式碼其實只要慢慢琢磨,打個標記,那麼在後面的解析的過程中還是能夠慢慢看懂的。
共