Tars | TarsJava SpingBoot啟動與負載均衡原始碼初探
前言
通過原始碼分析可以得出這樣一個負載均衡的原始碼結構圖(基於TarsJava SpringBoot):
@EnableTarsServer註解:表明這是一個Tars服務;
- @Import(TarsServerConfiguration.class):引入Tars服務相關配置檔案;
- Communcator:通訊器;
- getServantProxyFactory():獲取代理工廠管理者;
- getObjectProxyFactory():獲取物件代理工廠;
- createLoadBalance():建立客戶端負載均衡呼叫器;
- select():選擇負載均衡呼叫器(有四種模式可以選擇);
- invoker:呼叫器;
- invoke():具體的執行方法;
- doInvokeServant():最底層的執行方法;
- invoke():具體的執行方法;
- invoker:呼叫器;
- refresh():更新負載均衡呼叫器;
- select():選擇負載均衡呼叫器(有四種模式可以選擇);
- createProtocolInvoker():建立協議呼叫器;
- createLoadBalance():建立客戶端負載均衡呼叫器;
- Communcator:通訊器;
注:在說明註解時,第一點加粗為註解中文含義,第二點為一般加在哪身上,縮排或程式碼塊為示例,如:
@註解
- 中文含義
- 加在哪
- 其他……
語句示例
//程式碼示例
1. Tars客戶端啟動
我們知道Tars應用可以分為客戶端與服務端,而負載均衡邏輯一般在客戶端,因此我們將只關注客戶端的啟動流程。
一個基礎知識,SpringBoot應用入口在主啟動類,Tars SpringBoot的主啟動類是這樣的:
可以發現它與普通SpringBoot應用的區別在於多了個@EnableTarsServer
@EnableTarsServer
- Tars服務;
- 用在主啟動類上;
- 表名該服務是一個Tars服務,啟用Tars功能;
我們從examples/tars-spring-boot-client的主啟動類App.java(@EnableTarsServer註解)點進去,可以看到SpringBoot在啟動時幫我們做了哪些Tars相關的配置:
@EnableTarsServer
註解原始碼:
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(TarsServerConfiguration.class) public @interface EnableTarsServer { }
可以知道他幫我們引入了Tars服務配置類TarsServerConfiguration.class
,我們點進去:
@Configuration
public class TarsServerConfiguration {
private final Server server = Server.getInstance();
@Bean
public Server server() {
return this.server;
}
@Bean
// 從通訊器工廠注入通訊器Communcator
public Communicator communicator() {
return CommunicatorFactory.getInstance().getCommunicator();
}
@Bean
//通訊器後置處理器
public CommunicatorBeanPostProcessor communicatorBeanPostProcessor(Communicator communicator) {
return new CommunicatorBeanPostProcessor(communicator);
}
@Bean
//注入配置幫助器
public ConfigHelper configHelper() {
return ConfigHelper.getInstance();
}
@Bean
//注入Servlet容器定製器
public ServletContainerCustomizer servletContainerCustomizer() {
return new ServletContainerCustomizer();
}
@Bean
//Tars伺服器啟動生命週期
public TarsServerStartLifecycle applicationStartLifecycle(Server server) {
return new TarsServerStartLifecycle(server);
}
}
在這些容器中,可以看出最重要的是通訊器Communicator
,裡面定義了代理方式、配置檔案、負載均衡選擇器等重要屬性,下面我們來分析這個容器
2. Communicator通訊器
通訊器,最關鍵的容器
通過原始碼分析,我們可以知道這個容器裡有通訊器相關初始化initCommunicator()
、關閉shutdown()
、獲取容器idgetId()
等基礎方法,此外,有幾個比較關鍵的方法:
getCommunicatorConfig
:獲取客戶端協調器的配置檔案。該配置檔案裡做了一些超時、執行緒數等相關配置;
-
getServantProxyFactory
:獲取代理工廠管理者。管理者的主要作用是管理ObjectProxyFactory,如果快取有就從快取中取,沒有就生產;public <T> Object getServantProxy(Class<T> clazz, String objName, String setDivision, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) { //獲取管理者的鍵 String key = setDivision != null ? clazz.getSimpleName() + objName + setDivision : clazz.getSimpleName() + objName; //通過鍵從快取中獲取管理者的值 Object proxy = cache.get(key); if (proxy == null) { lock.lock(); try { proxy = cache.get(key); if (proxy == null) { //建立管理者 ObjectProxy<T> objectProxy = communicator.getObjectProxyFactory().getObjectProxy( clazz, objName, setDivision, servantProxyConfig, loadBalance, protocolInvoker); //將管理者放進快取 cache.put(key, createProxy(clazz, objectProxy)); proxy = cache.get(key); } } finally { lock.unlock(); } } return proxy; }
-
getObjectProxyFactory
:獲取物件代理工廠。該工廠的作用是生產物件代理ObjectProxy,包括建立Servant服務的配置資訊與更新服務端點等://生產物件代理ObjectProxy public <T> ObjectProxy<T> getObjectProxy(Class<T> api, String objName, String setDivision, ServantProxyConfig servantProxyConfig, LoadBalance<T> loadBalance, ProtocolInvoker<T> protocolInvoker) throws ClientException { //如果容器裡沒有服務代理相關配置,則生成預設配置;如果容器裡有服務代理相關配置,說明使用者自定義了使用者配置了服務代理,則讀取使用者配置檔案進行自定義配置(SpringBoot的核心思想之一) if (servantProxyConfig == null) { servantProxyConfig = createServantProxyConfig(objName, setDivision); } else { servantProxyConfig.setCommunicatorId(communicator.getId()); servantProxyConfig.setModuleName(communicator.getCommunicatorConfig().getModuleName(), communicator.getCommunicatorConfig().isEnableSet(), communicator.getCommunicatorConfig().getSetDivision()); servantProxyConfig.setLocator(communicator.getCommunicatorConfig().getLocator()); addSetDivisionInfo(servantProxyConfig, setDivision); servantProxyConfig.setRefreshInterval(communicator.getCommunicatorConfig().getRefreshEndpointInterval()); servantProxyConfig.setReportInterval(communicator.getCommunicatorConfig().getReportInterval()); } //更新服務端點 updateServantEndpoints(servantProxyConfig); //【重要】建立客戶端負載均衡呼叫器 if (loadBalance == null) { loadBalance = createLoadBalance(servantProxyConfig); } //建立協議呼叫器 if (protocolInvoker == null) { protocolInvoker = createProtocolInvoker(api, servantProxyConfig); } return new ObjectProxy<T>(api, servantProxyConfig, loadBalance, protocolInvoker, communicator); } …… //建立Servant服務的配置資訊 private ServantProxyConfig createServantProxyConfig(String objName, String setDivision) throws CommunicatorConfigException { …… } …… //更新服務端點:通過ObjectName判斷是有設定了伺服器節點,如果有(本地只連線),如果沒有那就從tars管理中獲取伺服器節點。放在ServantCacheManager管理起來。 private void updateServantEndpoints(ServantProxyConfig cfg) { CommunicatorConfig communicatorConfig = communicator.getCommunicatorConfig(); …… }
通過上面的客戶端啟動流程原始碼分析,我們找到第一個核心點: 客戶端的負載均衡呼叫器LoadBalance。
*除了建立了一個負載均衡呼叫器LoadBalance
,還建立了一個協議呼叫器protocolInvoker
,該協議呼叫器裡分別對同步與非同步呼叫方法、Tars與Http協議請求處理、以及過濾器等相關配置,但我們的重點不在這,下面將著重分析LoadBalance
。
3. 客戶端的負載均衡呼叫器LoadBalance
我們點進去檢視原有負載均衡邏輯,發現這是一個介面,裡面定義了兩個方法,都是與負載均衡呼叫器相關的:
public interface LoadBalance<T> {
/**
* 選擇負載均衡呼叫器
* @param 呼叫的上下文
* @return
* @throws 無負載均衡呼叫器 - 異常
*/
Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;
/**
* 重新整理本地負載均衡呼叫器
* @param 負載均衡呼叫器
*/
void refresh(Collection<Invoker<T>> invokers);
}
我們Ctrl+H一下即可發現該介面有四個實現類:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-KJyHqhl5-1627523692062)(https://lexiangla.com/assets/3963f704ee0411ebbe94aee286d18512 "負載均衡呼叫器實現類")]
分別是:
- ConsistentHashLoadBalance:一致hash選擇器;
- HashLoadBalance:hash選擇器;
- RoundRobinLoadBalance: 輪詢選擇器;
- DefaultLoadBalance:預設的選擇器(由原始碼可知先ConsistentHashLoadBalance,HashLoadBalance,RoundRobinLoadBalance);
需要注意實現類有四個,選擇器有三個。這四個選擇器都是一個構造方法+實現介面的兩個方法,比較相近。下面我們只分析RoundRobinLoadBalance
的select方法:
@Override
public Invoker<T> select(InvokeContext invocation) throws NoInvokerException {
//靜態權重快取器列表
List<Invoker<T>> staticWeightInvokers = staticWeightInvokersCache;
//使用權重輪詢
if (staticWeightInvokers != null && !staticWeightInvokers.isEmpty()) {
//【體現輪詢】根據index獲取一個呼叫器,規則是:獲取“靜態權重順序遞增值”的絕對值後對“靜態權重快取器數”取餘?
Invoker<T> invoker = staticWeightInvokers.get((staticWeightSequence.getAndIncrement() & Integer.MAX_VALUE) % staticWeightInvokers.size());
//如果呼叫器存活則直接返回
if (invoker.isAvailable()) return invoker;
//判斷存活:先根據呼叫器的url獲取“呼叫器活動狀態”,判斷:狀態的“上次重試時間”+“嘗試重啟時間間隔” < “系統當前時間”,存活則將系統當前時間設定為“上次重啟時間”
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
stat.setLastRetryTime(System.currentTimeMillis());
return invoker;
}
}
//無權重輪詢,丟擲異常
List<Invoker<T>> sortedInvokers = sortedInvokersCache;
if (CollectionUtils.isEmpty(sortedInvokers)) {
throw new NoInvokerException("no such active connection invoker");
}
List<Invoker<T>> list = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : sortedInvokers) {
//如果呼叫器掛了
if (!invoker.isAvailable()) {
//嘗試救回呼叫器:先根據呼叫器的url獲取“呼叫器活動狀態”,判斷:狀態的“上次重試時間”+“嘗試重啟時間間隔” < “系統當前時間”,存活則加入到list中,掛了就不加入
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
list.add(invoker);
}
} else {
//呼叫器存活則將呼叫器新增到list裡
list.add(invoker);
}
}
//TODO When all is not available. Whether to randomly extract one
if (list.isEmpty()) {
throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
}
//隨機獲取一個呼叫器?
Invoker<T> invoker = list.get((sequence.getAndIncrement() & Integer.MAX_VALUE) % list.size());
//如果呼叫器不存活,則將當前系統時間設定為該呼叫器的上次重啟時間
if (!invoker.isAvailable()) {
//Try to recall after blocking
logger.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
}
return invoker;
}
可以看出select
方法重點還是在於“怎樣”找到一個負載均衡呼叫器,只不過實現的方法不同,有的採用輪詢的方法、有的根據hash值,而我們關注的是給負載均衡方法做擴充套件(增添路由規則),因此這裡也不是重點。但為我們指明瞭一個方向,就是上面原始碼裡反覆提到的invoker
呼叫器(invoker老眼熟了,SpringBoot裡的controller引數處理裡也有它)。
我們來看看Tars裡的invoker
,它也是一個介面,只有一個實現類,
public interface Invoker<T> {
//獲取uil
Url getUrl();
//獲取api
Class<T> getApi();
//判斷是否存活
boolean isAvailable();
//執行方法
Object invoke(InvokeContext context) throws Throwable;
//銷燬方法
void destroy();
}
通過對這幾個實現類的原始碼閱讀,我們發現invoke
方法就是對doInvokeServant
底層方法進行層層封裝。
通過對TarsInvoker
的原始碼閱讀,我們還可以知道TarsInvoker有四個屬性config、api、url、clients,對應前面提到的getXXX對應方法;還可以設定是否存活,對應前文對是否存活的判斷。在doInvokeServant
裡最核心的操作流程是try裡面的語句:
public class TarsInvoker<T> extends ServantInvoker<T> {
final List<Filter> filters;
public TarsInvoker(ServantProxyConfig config, Class<T> api, Url url, ServantClient[] clients) {
super(config, api, url, clients);
filters = AppContextManager.getInstance().getAppContext() == null ? null : AppContextManager.getInstance().getAppContext().getFilters(FilterKind.CLIENT);
}
@Override
public void setAvailable(boolean available) {
super.setAvailable(available);
}
@Override
protected Object doInvokeServant(final ServantInvokeContext inv) throws Throwable {
final long begin = System.currentTimeMillis();
int ret = Constants.INVOKE_STATUS_SUCC;
try {
//根據api獲取將要執行的方法
Method method = getApi().getMethod(inv.getMethodName(), inv.getParameterTypes());
//如果是非同步呼叫
if (inv.isAsync()) {
//執行非同步方法
invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;
//如果是承諾未來???
} else if (inv.isPromiseFuture()) {
return invokeWithPromiseFuture(method, inv.getArguments(), inv.getAttachments());// return Future Result
} else {
//執行同步方法
TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
ret = response.getRet() == TarsHelper.SERVERSUCCESS ? Constants.INVOKE_STATUS_SUCC : Constants.INVOKE_STATUS_EXEC;
if (response.getRet() != TarsHelper.SERVERSUCCESS) {
throw ServerException.makeException(response.getRet(), response.getRemark());
}
return response.getResult();
}
} catch (Throwable e) {
if (e instanceof TimeoutException) {
ret = Constants.INVOKE_STATUS_TIMEOUT;
} else if (e instanceof NotConnectedException) {
ret = Constants.INVOKE_STATUS_NETCONNECTTIMEOUT;
} else {
ret = Constants.INVOKE_STATUS_EXEC;
}
throw e;
} finally {
if (inv.isNormal()) {
setAvailable(ServantInvokerAliveChecker.isAlive(getUrl(), config, ret));
InvokeStatHelper.getInstance().addProxyStat(objName)
.addInvokeTimeByClient(config.getMasterName(), config.getSlaveName(), config.getSlaveSetName(), config.getSlaveSetArea(),
config.getSlaveSetID(), inv.getMethodName(), getUrl().getHost(), getUrl().getPort(), ret, System.currentTimeMillis() - begin);
}
}
}
……
}
而try語句裡主要做的是執行的呼叫方法(非同步、同步、承諾未來),由於我們要擴充套件的路由功能與呼叫方法無關,這裡就不深入分析了。
由此我們可以分析得出負載均衡設計的底層結構圖:
@EnableTarsServer註解:表明這是一個Tars服務;
- @Import(TarsServerConfiguration.class):引入Tars服務相關配置檔案;
- Communcator:通訊器;
- getServantProxyFactory():獲取代理工廠管理者;
- getObjectProxyFactory():獲取物件代理工廠;
- createLoadBalance():建立客戶端負載均衡呼叫器;
- select():選擇負載均衡呼叫器(有四種模式可以選擇);
- invoker:呼叫器;
- invoke():具體的執行方法;
- doInvokeServant():最底層的執行方法;
- invoke():具體的執行方法;
- invoker:呼叫器;
- refresh():更新負載均衡呼叫器;
- select():選擇負載均衡呼叫器(有四種模式可以選擇);
- createProtocolInvoker():建立協議呼叫器;
- createLoadBalance():建立客戶端負載均衡呼叫器;
- Communcator:通訊器;