1. 程式人生 > 其它 >Tars | TarsJava SpingBoot啟動與負載均衡原始碼初探

Tars | TarsJava SpingBoot啟動與負載均衡原始碼初探

目錄

前言

通過原始碼分析可以得出這樣一個負載均衡的原始碼結構圖(基於TarsJava SpringBoot):

@EnableTarsServer註解:表明這是一個Tars服務;

  • @Import(TarsServerConfiguration.class):引入Tars服務相關配置檔案;
    • Communcator:通訊器;
      • getServantProxyFactory():獲取代理工廠管理者;
      • getObjectProxyFactory():獲取物件代理工廠;
        • createLoadBalance():建立客戶端負載均衡呼叫器;
          • select():選擇負載均衡呼叫器(有四種模式可以選擇);
            • invoker:呼叫器;
              • invoke():具體的執行方法;
                • doInvokeServant():最底層的執行方法;
          • refresh():更新負載均衡呼叫器;
        • createProtocolInvoker():建立協議呼叫器;

注:在說明註解時,第一點加粗為註解中文含義,第二點為一般加在哪身上,縮排或程式碼塊為示例,如:

@註解

  • 中文含義
  • 加在哪
  • 其他……
    • 語句示例
    //程式碼示例
    

1. Tars客戶端啟動

我們知道Tars應用可以分為客戶端與服務端,而負載均衡邏輯一般在客戶端,因此我們將只關注客戶端的啟動流程。

一個基礎知識,SpringBoot應用入口在主啟動類,Tars SpringBoot的主啟動類是這樣的:

可以發現它與普通SpringBoot應用的區別在於多了個@EnableTarsServer

註解;

@EnableTarsServer

  • Tars服務;
  • 用在主啟動類上;
  • 表名該服務是一個Tars服務,啟用Tars功能;

我們從examples/tars-spring-boot-client的主啟動類App.java(@EnableTarsServer註解)點進去,可以看到SpringBoot在啟動時幫我們做了哪些Tars相關的配置:

@EnableTarsServer註解原始碼:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TarsServerConfiguration.class)
public @interface EnableTarsServer {
}

可以知道他幫我們引入了Tars服務配置類TarsServerConfiguration.class,我們點進去:

@Configuration
public class TarsServerConfiguration {

    private final Server server = Server.getInstance();

    @Bean
    public Server server() {
        return this.server;
    }

    @Bean
    // 從通訊器工廠注入通訊器Communcator
    public Communicator communicator() {
        return CommunicatorFactory.getInstance().getCommunicator();
    }

    @Bean
    //通訊器後置處理器
    public CommunicatorBeanPostProcessor communicatorBeanPostProcessor(Communicator communicator) {
        return new CommunicatorBeanPostProcessor(communicator);
    }

    @Bean
    //注入配置幫助器
    public ConfigHelper configHelper() {
        return ConfigHelper.getInstance();
    }

    @Bean
    //注入Servlet容器定製器
    public ServletContainerCustomizer servletContainerCustomizer() {
        return new ServletContainerCustomizer();
    }

    @Bean
    //Tars伺服器啟動生命週期
    public TarsServerStartLifecycle applicationStartLifecycle(Server server) {
        return new TarsServerStartLifecycle(server);
    }
}

在這些容器中,可以看出最重要的是通訊器Communicator,裡面定義了代理方式、配置檔案、負載均衡選擇器等重要屬性,下面我們來分析這個容器


2. Communicator通訊器

通訊器,最關鍵的容器

通過原始碼分析,我們可以知道這個容器裡有通訊器相關初始化initCommunicator()、關閉shutdown()、獲取容器idgetId()等基礎方法,此外,有幾個比較關鍵的方法:

  1. getCommunicatorConfig:獲取客戶端協調器的配置檔案。該配置檔案裡做了一些超時、執行緒數等相關配置;
  1. getServantProxyFactory:獲取代理工廠管理者。管理者的主要作用是管理ObjectProxyFactory,如果快取有就從快取中取,沒有就生產;

    public <T> Object getServantProxy(Class<T> clazz, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
                                      LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {
        //獲取管理者的鍵
        String key = setDivision != null ? clazz.getSimpleName() + objName + setDivision : clazz.getSimpleName() + objName;
        //通過鍵從快取中獲取管理者的值
        Object proxy = cache.get(key);
        if (proxy == null) {
            lock.lock();
            try {
                proxy = cache.get(key);
                if (proxy == null) {
                    //建立管理者
                    ObjectProxy<T> objectProxy = communicator.getObjectProxyFactory().getObjectProxy(
                        clazz, objName, setDivision, servantProxyConfig, loadBalance, protocolInvoker);
                    //將管理者放進快取
                    cache.put(key, createProxy(clazz, objectProxy));
                    proxy = cache.get(key);
                }
            } finally {
                lock.unlock();
            }
        }
        return proxy;
    }
    
  2. getObjectProxyFactory:獲取物件代理工廠。該工廠的作用是生產物件代理ObjectProxy,包括建立Servant服務的配置資訊與更新服務端點等:

    //生產物件代理ObjectProxy
    public <T> ObjectProxy<T> getObjectProxy(Class<T> api, String objName, String setDivision, ServantProxyConfig servantProxyConfig,
                                             LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException {
        //如果容器裡沒有服務代理相關配置,則生成預設配置;如果容器裡有服務代理相關配置,說明使用者自定義了使用者配置了服務代理,則讀取使用者配置檔案進行自定義配置(SpringBoot的核心思想之一)
        if (servantProxyConfig == null) {
            servantProxyConfig = createServantProxyConfig(objName, setDivision);
        } else {
            servantProxyConfig.setCommunicatorId(communicator.getId());
            servantProxyConfig.setModuleName(communicator.getCommunicatorConfig().getModuleName(), communicator.getCommunicatorConfig().isEnableSet(), communicator.getCommunicatorConfig().getSetDivision());
            servantProxyConfig.setLocator(communicator.getCommunicatorConfig().getLocator());
            addSetDivisionInfo(servantProxyConfig, setDivision);
            servantProxyConfig.setRefreshInterval(communicator.getCommunicatorConfig().getRefreshEndpointInterval());
            servantProxyConfig.setReportInterval(communicator.getCommunicatorConfig().getReportInterval());
        }
    
        //更新服務端點
        updateServantEndpoints(servantProxyConfig);
    
        //【重要】建立客戶端負載均衡呼叫器
        if (loadBalance == null) {
            loadBalance = createLoadBalance(servantProxyConfig);
        }
    
        //建立協議呼叫器
        if (protocolInvoker == null) {
            protocolInvoker = createProtocolInvoker(api, servantProxyConfig);
        }
        return new ObjectProxy<T>(api, servantProxyConfig, loadBalance, protocolInvoker, communicator);
    }
    
    ……
    
        //建立Servant服務的配置資訊
        private ServantProxyConfig createServantProxyConfig(String objName, String setDivision) throws CommunicatorConfigException {
        ……
    }
    
    ……
    
        //更新服務端點:通過ObjectName判斷是有設定了伺服器節點,如果有(本地只連線),如果沒有那就從tars管理中獲取伺服器節點。放在ServantCacheManager管理起來。
        private void updateServantEndpoints(ServantProxyConfig cfg) {
        CommunicatorConfig communicatorConfig = communicator.getCommunicatorConfig();
        ……
    }
    

通過上面的客戶端啟動流程原始碼分析,我們找到第一個核心點: 客戶端的負載均衡呼叫器LoadBalance

*除了建立了一個負載均衡呼叫器LoadBalance,還建立了一個協議呼叫器protocolInvoker,該協議呼叫器裡分別對同步與非同步呼叫方法、Tars與Http協議請求處理、以及過濾器等相關配置,但我們的重點不在這,下面將著重分析LoadBalance


3. 客戶端的負載均衡呼叫器LoadBalance

我們點進去檢視原有負載均衡邏輯,發現這是一個介面,裡面定義了兩個方法,都是與負載均衡呼叫器相關的:

public interface LoadBalance<T> {

    /**
     * 選擇負載均衡呼叫器
     * @param 呼叫的上下文
     * @return
     * @throws 無負載均衡呼叫器 - 異常
     */
    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;

    /**
     * 重新整理本地負載均衡呼叫器
     * @param 負載均衡呼叫器
     */
    void refresh(Collection<Invoker<T>> invokers);
}

我們Ctrl+H一下即可發現該介面有四個實現類:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-KJyHqhl5-1627523692062)(https://lexiangla.com/assets/3963f704ee0411ebbe94aee286d18512 "負載均衡呼叫器實現類")]

分別是:

  • ConsistentHashLoadBalance:一致hash選擇器;
  • HashLoadBalance:hash選擇器;
  • RoundRobinLoadBalance: 輪詢選擇器;
  • DefaultLoadBalance:預設的選擇器(由原始碼可知先ConsistentHashLoadBalance,HashLoadBalance,RoundRobinLoadBalance);

需要注意實現類有四個,選擇器有三個。這四個選擇器都是一個構造方法+實現介面的兩個方法,比較相近。下面我們只分析RoundRobinLoadBalance的select方法:

@Override
public Invoker<T> select(InvokeContext invocation) throws NoInvokerException {

    //靜態權重快取器列表
    List<Invoker<T>> staticWeightInvokers = staticWeightInvokersCache;

    //使用權重輪詢
    if (staticWeightInvokers != null && !staticWeightInvokers.isEmpty()) {
        //【體現輪詢】根據index獲取一個呼叫器,規則是:獲取“靜態權重順序遞增值”的絕對值後對“靜態權重快取器數”取餘?
        Invoker<T> invoker = staticWeightInvokers.get((staticWeightSequence.getAndIncrement() & Integer.MAX_VALUE) % staticWeightInvokers.size());
        //如果呼叫器存活則直接返回
        if (invoker.isAvailable()) return invoker;

        //判斷存活:先根據呼叫器的url獲取“呼叫器活動狀態”,判斷:狀態的“上次重試時間”+“嘗試重啟時間間隔” < “系統當前時間”,存活則將系統當前時間設定為“上次重啟時間”
        ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
        if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
            logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
            stat.setLastRetryTime(System.currentTimeMillis());
            return invoker;
        }
    }

    //無權重輪詢,丟擲異常
    List<Invoker<T>> sortedInvokers = sortedInvokersCache;
    if (CollectionUtils.isEmpty(sortedInvokers)) {
        throw new NoInvokerException("no such active connection invoker");
    }

    List<Invoker<T>> list = new ArrayList<Invoker<T>>();
    for (Invoker<T> invoker : sortedInvokers) {
        //如果呼叫器掛了
        if (!invoker.isAvailable()) {

            //嘗試救回呼叫器:先根據呼叫器的url獲取“呼叫器活動狀態”,判斷:狀態的“上次重試時間”+“嘗試重啟時間間隔” < “系統當前時間”,存活則加入到list中,掛了就不加入
            ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
            if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
                list.add(invoker);
            }
        } else {
            //呼叫器存活則將呼叫器新增到list裡
            list.add(invoker);
        }
    }
    //TODO When all is not available. Whether to randomly extract one
    if (list.isEmpty()) {
        throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
    }

    //隨機獲取一個呼叫器?
    Invoker<T> invoker = list.get((sequence.getAndIncrement() & Integer.MAX_VALUE) % list.size());

    //如果呼叫器不存活,則將當前系統時間設定為該呼叫器的上次重啟時間
    if (!invoker.isAvailable()) {
        //Try to recall after blocking
        logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
        ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
    }
    return invoker;
}

可以看出select方法重點還是在於“怎樣”找到一個負載均衡呼叫器,只不過實現的方法不同,有的採用輪詢的方法、有的根據hash值,而我們關注的是給負載均衡方法做擴充套件(增添路由規則),因此這裡也不是重點。但為我們指明瞭一個方向,就是上面原始碼裡反覆提到的invoker呼叫器(invoker老眼熟了,SpringBoot裡的controller引數處理裡也有它)。

我們來看看Tars裡的invoker,它也是一個介面,只有一個實現類,

public interface Invoker<T> {

    //獲取uil
    Url getUrl();

    //獲取api
    Class<T> getApi();

    //判斷是否存活
    boolean isAvailable();

    //執行方法
    Object invoke(InvokeContext context) throws Throwable;

    //銷燬方法
    void destroy();
}

通過對這幾個實現類的原始碼閱讀,我們發現invoke方法就是對doInvokeServant底層方法進行層層封裝。

通過對TarsInvoker的原始碼閱讀,我們還可以知道TarsInvoker有四個屬性config、api、url、clients,對應前面提到的getXXX對應方法;還可以設定是否存活,對應前文對是否存活的判斷。在doInvokeServant裡最核心的操作流程是try裡面的語句:

public class TarsInvoker<T> extends ServantInvoker<T> {

    final List<Filter> filters;

    public TarsInvoker(ServantProxyConfig config, Class<T> api, Url url, ServantClient[] clients) {
        super(config, api, url, clients);
        filters = AppContextManager.getInstance().getAppContext() == null ? null : AppContextManager.getInstance().getAppContext().getFilters(FilterKind.CLIENT);
    }

    @Override
    public void setAvailable(boolean available) {
        super.setAvailable(available);
    }

@Override
    protected Object doInvokeServant(final ServantInvokeContext inv) throws Throwable {
        final long begin = System.currentTimeMillis();
        int ret = Constants.INVOKE_STATUS_SUCC;
        try {
            //根據api獲取將要執行的方法
            Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());

            //如果是非同步呼叫
            if (inv.isAsync()) {
                //執行非同步方法
                invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
                return null;
            //如果是承諾未來???
            } else if (inv.isPromiseFuture()) {
                return invokeWithPromiseFuture(method, inv.getArguments(), inv.getAttachments());// return Future Result
            } else {
                //執行同步方法
                TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
                ret = response.getRet() == TarsHelper.SERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
                if (response.getRet() != TarsHelper.SERVERSUCCESS) {
                    throw ServerException.makeException(response.getRet(), response.getRemark());
                }
                return response.getResult();
            }
        } catch (Throwable e) {
            if (e instanceof TimeoutException) {
                ret = Constants.INVOKE_STATUS_TIMEOUT;
            } else if (e instanceof NotConnectedException) {
                ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
            } else {
                ret = Constants.INVOKE_STATUS_EXEC;
            }
            throw e;
        } finally {
            if (inv.isNormal()) {
                setAvailable(ServantInvokerAliveChecker.isAlive(getUrl(), config, ret));
                InvokeStatHelper.getInstance().addProxyStat(objName)
                        .addInvokeTimeByClient(config.getMasterName(), config.getSlaveName(), config.getSlaveSetName(), config.getSlaveSetArea(),
                                config.getSlaveSetID(), inv.getMethodName(), getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
            }
        }
    }
 
    ……
}

而try語句裡主要做的是執行的呼叫方法(非同步、同步、承諾未來),由於我們要擴充套件的路由功能與呼叫方法無關,這裡就不深入分析了。

由此我們可以分析得出負載均衡設計的底層結構圖:

@EnableTarsServer註解:表明這是一個Tars服務;

  • @Import(TarsServerConfiguration.class):引入Tars服務相關配置檔案;
    • Communcator:通訊器;
      • getServantProxyFactory():獲取代理工廠管理者;
      • getObjectProxyFactory():獲取物件代理工廠;
        • createLoadBalance():建立客戶端負載均衡呼叫器;
          • select():選擇負載均衡呼叫器(有四種模式可以選擇);
            • invoker:呼叫器;
              • invoke():具體的執行方法;
                • doInvokeServant():最底層的執行方法;
          • refresh():更新負載均衡呼叫器;
        • createProtocolInvoker():建立協議呼叫器;

最後

新人制作,如有錯誤,歡迎指出,感激不盡!
歡迎關注公眾號,會分享一些更日常的東西!
如需轉載,請標註出處!