1. 程式人生 > 實用技巧 >Sentinel併發限流不精確-之責任鏈

Sentinel併發限流不精確-之責任鏈

​ 在之前調研Sentinel的過程中,為了準備分享內容,自己就簡單的寫了一些測試程式碼,不過在測試中遇到了一些問題,其中有一個問題就是Sentinel流控在併發情況下限流並不精確,當時我還在想,這個我在做分享的時候該怎麼來自圓其說呢,所以覺得比較有意思,在這裡做一個記錄。同時在排查這個問題的過程中,為了說清楚問題原因,我覺得有必要理一下它的責任鏈,所以副標題就是Sentinel的責任鏈。


一、問題起源

​ 在這裡我直接上我的測試程式碼,我的本意是要起10個執行緒同時去請求這個資源,每個執行緒輪詢10次。同時,對於這個資源我設定了限流規則為QPS=1。這也就意味著我這10個執行緒共100個請求,正確的結果應該是成功1個,阻塞99個。

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 流量控制演示
 */
public class FlowQpsDemo_Bug {

    private static final String SOURCE_KEY = "CESHI_KEY";

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {
        initFlowQpsRule();
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(10);
        for (int i = 0;i < 10;i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (InterruptedException e) {
                    }
                    for (int i = 0;i < 10;i++) {
                        fun();
                    }
                    end.countDown();
                }
            }).start();
        }
        start.countDown();
        end.await();
        System.out.println("total=" + total.get() + " pass=" + pass.get() + " block=" + block.get());
    }

    public static void fun() {
        Entry entry = null;
        try {
            entry = SphU.entry(SOURCE_KEY);
            // todo 業務邏輯
            pass.incrementAndGet();
        } catch (BlockException e1) {
            // todo 流控處理
            block.incrementAndGet();
        } finally {
            total.incrementAndGet();
            if (entry != null) {
                entry.exit();
            }
        }
    }

    private static void initFlowQpsRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(SOURCE_KEY);
        // 採用qps策略,每秒允許通過1個請求
        rule1.setCount(1);
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }
}

​ 但是,實際執行結果卻並非如此,執行好多次都是成功10個,阻塞90個。也有其他情況比如成功5、6個,阻塞90多個。這顯然不是我想要的結果。並且考慮到我們在生產環境設定的限流往往是壓測的極限值了,如果這時限流還不準確,那豈不是就失去了系統保護的作用了。

​ 為了解決這個問題,最直接的方式就是去走一遍它的原始碼,這一走就看到了問題。為了說明問題所在,有必要先介紹一下Sentinel內部責任鏈的使用。

二、Sentinel內部責任鏈的使用

​ 責任鏈由一系列節點組成,每個節點擁有一個next屬性來指向下一個處理節點。當一個請求進來時,當前節點或者執行完後將請求交個next節點處理,或者先交個next節點處理完後自己再處理。這樣就形成一條鏈,將請求一個節點一個節點的往下傳遞處理執行。

​ 在Sentinel中,對應的就是Slot,官方文件中翻譯為功能插槽。最頂層的插槽定義就是com.alibaba.csp.sentinel.slotchain.ProcessorSlot,這是一個介面,下面有一個抽象類com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot實現了它,其中最主要的邏輯就是entry和exit兩個方法,分別在請求進來和退出時呼叫。

​ 而AbstractLinkedProcessorSlot有一系列的子類,這一系列的子類分別完成不同的功能,列舉幾個重要的如下:

  • StatisticSlot:處理統計邏輯,它是其它限流、降級等插槽執行的基礎;
  • ClusterBuilderSlot :用於儲存資源的統計資訊以及呼叫者資訊,例如該資源的 RT, QPS, thread count 等等,這些資訊將用作為多維度限流,降級的依據;
  • FlowSlot:用於根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制;
  • DegradeSlot:通過統計資訊以及預設的規則,來做熔斷降級;

​ 整個責任鏈的構建入口為DefaultSlotChainBuilder.build。其中,每個功能插槽都有對應的順序,Sentinel在構建鏈的時候按照優先順序從小到大的順序進行串聯構建鏈路。

三、不精確原因

​ 在這次採坑中,兩個主角就是StatisticSlot和FlowSlot,根據原始碼中的@SpiOrder標註,StatisticSlot為-7000,FlowSlot為-2000,所以FlowSlot的優先順序低於StatisticSlot。那麼當請求進來的時候先執行的是StatisticSlot。

​ 首先看StatisticSlot的entry方法的執行,這裡我刪去了BlockException異常處理的邏輯,它大體上與上面異常的處理邏輯一樣。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // Request passed, add thread count and pass count.
        node.increaseThreadNum();
        // todo issue_1620 這個時候才會增加QPS,但是剛才前面的DefaultController.pass方法已經返回了true
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
    } catch (Throwable e) {
        // Unexpected internal error, set error to current entry.
        context.getCurEntry().setError(e);
        throw e;
    }
}

​ 在上面這段程式碼中,StatisticSlot在entry方法中並不是先執行的自己的邏輯,首先呼叫的是fireEntry方法,進入到這個方法,在AbstractLinkedProcessorSlot類中,程式碼如下:

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable {
    if (next != null) {
        next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
    }
}

​ 可以看出,在StatisticSlot的entry方法中,它是先呼叫父類的fireEntry交由後面的節點執行,等後面如限流、降級節點執行完後返回到StatisticSlot的entry方法中後,它才執行對應的統計邏輯。

​ 那麼這時可直接進到FlowSlot的entry方法中,程式碼如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

​ 可以看出,checkFlow就是限流規則的執行了。進到這個方法中,其邏輯就是呼叫FlowRuleChecker檢查各項限流規則是否已經達到閾值,如果沒有達到閾值則放行,如果達到閾值了則阻塞。在規則的檢查中,一路跟到了DefaultController類的canPass方法中,其邏輯如下:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);
    // 當前閾值 + acquireCount 大於規則設定的count,則返回false,否則返回true
    // 如果併發執行到這裡,並沒有加鎖,所以多個執行緒都會返回true,限流失效
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);

                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}

​ 問題就出在這個判斷中了,要知道前面的統計插槽一定是要等後面的功能插槽執行完後才有統計的依據,而在一個統計時間段開始的時候,10個請求同時進來,這時前面還沒有統計資料,那麼curCount + acquireCount > count這個條件就可以同時滿足,這也就意味著這10個請求可以同時通過,所以就出現了上面最開始我遇到的情況,明明設定了QPS=1,但是還是有10個請求成功通過了,問題點就在這裡。

​ 對於這個問題,我後來看了一下git上面有問題說明,對應兩個連結如下:

https://github.com/alibaba/Sentinel/issues/1861
https://github.com/alibaba/Sentinel/issues/1620

​ 這個問題在1.6版本就提出了,但是目前1.8版本還是沒有修復。

​ 針對這個問題,我的理解就是:Sentinel可能不需要做的那麼完美,它可以不完美,但是不可以影響正常的業務系統執行,不能拉低正常業務系統的效能。

​ 同時這也是一個邏輯缺陷,採用了責任鏈模式,那麼在前面的統計插槽中進行統計時,必然要以後面的其它功能插槽的執行結果為依據。而後面的其它功能插槽對規則的檢查又依賴前面的統計資料,所以這個問題還真不好【完美】的去解決它。總不能在後面加鎖讓請求一個一個通過吧,當然這只是一個玩笑。

四、責任鏈構建

​ 上面提到了責任鏈構建的地方在DefaultSlotChainBuilder.build裡面,方法很簡單,如下:

@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();

    // Note: the instances of ProcessorSlot should be different, since they are not stateless.
    List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }

        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }

    return chain;
}

​ 其中DefaultProcessorSlotChain只是一個預設的頭結點,裡面只有first、end兩個元素。通過它的SpiLoader.loadPrototypeInstanceListSorted方法將ProcessorSlot的各個插槽實現類載入進來,然後進行連結。

​ 其中SpiLoader.loadPrototypeInstanceListSorted對載入的ProcessorSlot進行排序,排序使用到了一個SpiOrderWrapper包裝類,裡面有一個order屬性,載入排序如下:

public static <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz) {
    try {
        // Not use SERVICE_LOADER_MAP, to make sure the instances loaded are different.
        ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz);

        List<SpiOrderWrapper<T>> orderWrappers = new ArrayList<>();
        for (T spi : serviceLoader) {
            int order = SpiOrderResolver.resolveOrder(spi);
            // Since SPI is lazy initialized in ServiceLoader, we use online sort algorithm here.
            SpiOrderResolver.insertSorted(orderWrappers, spi, order);
            RecordLog.debug("[SpiLoader] Found {} SPI: {} with order {}", clazz.getSimpleName(),
                    spi.getClass().getCanonicalName(), order);
        }
        List<T> list = new ArrayList<>(orderWrappers.size());
        for (int i = 0; i < orderWrappers.size(); i++) {
            list.add(orderWrappers.get(i).spi);
        }
        return list;
    } catch (Throwable t) {
        RecordLog.error("[SpiLoader] ERROR: loadPrototypeInstanceListSorted failed", t);
        t.printStackTrace();
        return new ArrayList<>();
    }
}

​ 上面兩段就是Sentinel構建責任鏈的核心程式碼。整個程式碼還是非常漂亮,非常清晰。

五、結束語

​ 問:既然遇到了這個問題,那最後怎麼解決呢?

​ 答:不解決,繼續用。