Dubbo 源碼分析 - 集群容錯之 Directory
1. 簡介
前面文章分析了服務的導出與引用過程,從本篇文章開始,我將開始分析 Dubbo 集群容錯方面的源碼。這部分源碼包含四個部分,分別是服務目錄 Directory、服務路由 Router、集群 Cluster 和負載均衡 LoadBalance。這幾個部分的源碼邏輯比較獨立,我會分四篇文章進行分析。本篇文章作為集群容錯的開篇文章,將和大家一起分析服務目錄相關的源碼。在進行深入分析之前,我們先來了解一下服務目錄是什麽。服務目錄中存儲了一些和服務提供者有關的信息,通過服務目錄,服務消費者可獲取到服務提供者的信息,比如 ip、端口、服務協議等。通過這些信息,服務消費者就可通過 Netty 等客戶端進行遠程調用。在一個服務集群中,服務提供者數量並不是一成不變的,如果集群中新增了一臺機器,相應地在服務目錄中就要新增一條服務提供者記錄。或者,如果服務提供者的配置修改了,服務目錄中的記錄也要做相應的更新。如果這樣說,服務目錄和註冊中心的功能不就雷同了嗎。確實如此,這裏這麽說是為了方便大家理解。實際上服務目錄在獲取註冊中心的服務配置信息後,會為每條配置信息生成一個 Invoker 對象,並把這個 Invoker 對象存儲起來,這個 Invoker 才是服務目錄最終持有的對象。Invoker 有什麽用呢?看名字就知道了,這是一個具有遠程調用功能的對象。講到這大家應該知道了什麽是服務目錄了,它可以看做是 Invoker 集合,且這個集合中的元素會隨註冊中心的變化而進行動態調整。
好了,關於服務目錄這裏就先介紹這些,大家先有個大致印象即可。接下來我們通過繼承體系圖來了解一下服務目錄的家族成員都有哪些。
2. 繼承體系
服務目錄目前內置的實現有兩個,分別為 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類。AbstractDirectory 實現了 Directory 接口,這個接口包含了一個重要的方法定義,即 list(Invocation),用於列舉 Invoker。下面我們來看一下他們的繼承體系圖。
如上,Directory 繼承自 Node 接口,Node 這個接口繼承者比較多,像 Registry、Monitor、Invoker 等繼承了這個接口。這個接口包含了一個獲取配置信息的方法 getUrl,實現該接口的類可以向外提供配置信息。另外,大家註意看 RegistryDirectory 實現了 NotifyListener 接口,當註冊中心節點信息發生變化後,RegistryDirectory 可以通過此接口方法得到變更信息,並根據變更信息動態調整內部 Invoker 列表。
現在大家對服務目錄的繼承體系應該比較清楚了,下面我們深入到源碼中,探索服務目錄是如何實現的。
3. 源碼分析
本章我將分析 AbstractDirectory 和它兩個子類的源碼。這裏之所以要分析 AbstractDirectory,而不是直接分析子類是有一定原因的。AbstractDirectory 封裝了 Invoker 列舉流程,具體的列舉邏輯則由子類實現,這是典型的模板模式。所以,接下來我們先來看一下 AbstractDirectory 的源碼。
public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed..."); } // 調用 doList 方法列舉 Invoker,這裏的 doList 是模板方法,由子類實現 List<Invoker<T>> invokers = doList(invocation); // 獲取路由器 List<Router> localRouters = this.routers; if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { // 獲取 runtime 參數,並根據參數決定是否進行路由 if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { // 進行服務路由 invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: ..."); } } } return invokers; } // 模板方法,由子類實現 protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
上面就是 AbstractDirectory 的 list 方法源碼,這個方法封裝了 Invoker 的列舉過程。如下:
- 調用 doList 獲取 Invoker 列表
- 根據 Router 的 getUrl 返回值為空與否,以及 runtime 參數決定是否進行服務路由
以上步驟中,doList 是模板方法,需由子類實現。Router 的 runtime 參數這裏簡單說明一下,這個參數決定了是否在每次調用服務時都執行路由規則。如果 runtime 為 true,那麽每次調用服務前,都需要進行服務路由。這個對性能造成影響,慎重配置。關於該參數更詳細的說明,請參考官方文檔。
介紹完 AbstractDirectory,接下來我們開始分析子類的源碼。
3.1 StaticDirectory
StaticDirectory 即靜態服務目錄,顧名思義,它內部存放的 Invoker 是不會變動的。所以,理論上它和不可變 List 的功能很相似。下面我們來看一下這個類的實現。
public class StaticDirectory<T> extends AbstractDirectory<T> {
// Invoker 列表
private final List<Invoker<T>> invokers;
// 省略構造方法
@Override
public Class<T> getInterface() {
// 獲取接口類
return invokers.get(0).getInterface();
}
// 檢測服務目錄是否可用
@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
// 只要有一個 Invoker 是可用的,就任務當前目錄是可用的
return true;
}
}
return false;
}
@Override
public void destroy() {
if (isDestroyed()) {
return;
}
// 調用父類銷毀邏輯
super.destroy();
// 遍歷 Invoker 列表,並執行相應的銷毀邏輯
for (Invoker<T> invoker : invokers) {
invoker.destroy();
}
invokers.clear();
}
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
// 列舉 Inovker,也就是直接返回 invokers 成員變量
return invokers;
}
}
以上就是 StaticDirectory 的代碼邏輯,很簡單,大家都能看懂,我就不多說了。下面來看看 RegistryDirectory,這個類的邏輯比較復雜。
3.2 RegistryDirectory
RegistryDirectory 是一種動態服務目錄,它實現了 NotifyListener 接口。當註冊中心服務配置發生變化後,RegistryDirectory 可收到與當前服務相關的變化。收到變更通知後,RegistryDirectory 可根據配置變更信息刷新 Invoker 列表。RegistryDirectory 中有幾個比較重要的邏輯,第一是 Invoker 的列舉邏輯,第二是接受服務配置變更的邏輯,第三是 Invoker 的刷新邏輯。接下來,我將按順序對這三塊邏輯。
3.2.1 列舉 Invoker
Invoker 列舉邏輯封裝在 doList 方法中,這是個模板方法,前面已經介紹過了。那這裏就不過多啰嗦了,我們直入主題吧。
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 服務提供者關閉或禁用了服務,此時拋出 No provider 異常
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry ...");
}
List<Invoker<T>> invokers = null;
// 獲取 Invoker 本地緩存
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
// 獲取方法名和參數列表
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
// 檢測參數列表的第一個參數是否為 String 或 enum 類型
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
// 通過 方法名 + 第一個參數名稱 查詢 Invoker 列表,具體的使用場景暫時沒想到
invokers = localMethodInvokerMap.get(methodName + "." + args[0]);
}
if (invokers == null) {
// 通過方法名獲取 Invoker 列表
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
// 通過星號 * 獲取 Invoker 列表
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
// 通過叠代器獲取 Invoker 列表
invokers = iterator.next();
}
}
}
// 返回 Invoker 列表
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
以上代碼進行多次嘗試,以期從 localMethodInvokerMap 中獲取到 Invoker 列表。一般情況下,普通的調用可通過方法名獲取到對應的 Invoker 列表,泛化調用可通過 ***** 獲取到 Invoker 列表。按現有的邏輯,不管什麽情況下,***** 到 Invoker 列表的映射關系 <*****, invokers> 總是存在的,也就意味著 localMethodInvokerMap.get(Constants.ANY_VALUE) 總是有值返回。除非這個值是 null,才會通過通過叠代器獲取 Invoker 列表。至於什麽情況下為空,我暫時未完全搞清楚,我猜測是被路由規則(用戶可基於 Router 接口實現自定義路由器)處理後,可能會得到一個 null。目前僅是猜測,未做驗證。
本節的邏輯主要是從 localMethodInvokerMap 中獲取 Invoker,localMethodInvokerMap 源自 RegistryDirectory 類的成員變量 methodInvokerMap。doList 方法可以看做是對 methodInvokerMap 變量的讀操作,至於對 methodInvokerMap 變量的寫操作,這個將在後續進行分析。
3.2.2 接收服務變更通知
RegistryDirectory 是一個動態服務目錄,它需要接受註冊中心配置進行動態調整。因此 RegistryDirectory 實現了 NotifyListener 接口,通過這個接口獲取註冊中心變更通知。下面我們來看一下具體的邏輯。
public synchronized void notify(List<URL> urls) {
// 定義三個集合,分別用於存放服務提供者 url,路由 url,配置器 url
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
// 獲取 category 參數
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
// 根據 category 參數將 url 分別放到不同的列表中
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
// 添加路由器 url
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
// 添加配置器 url
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
// 添加服務提供者 url
invokerUrls.add(url);
} else {
// 忽略不支持的 category
logger.warn("Unsupported category ...");
}
}
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
// 將 url 轉成 Configurator
this.configurators = toConfigurators(configuratorUrls);
}
if (routerUrls != null && !routerUrls.isEmpty()) {
// 將 url 轉成 Router
List<Router> routers = toRouters(routerUrls);
if (routers != null) {
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators;
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
// 配置 overrideDirectoryUrl
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// 刷新 Invoker 列表
refreshInvoker(invokerUrls);
}
如上,notify 方法首先是根據 url 的 category 參數對 url 進行分門別類存儲,然後通過 toRouters 和 toConfigurators 將 url 列表轉成 Router 和 Configurator 列表。最後調用 refreshInvoker 方法刷新 Invoker 列表。這裏的 toRouters 和 toConfigurators 方法邏輯不復雜,大家自行分析。接下來,我們把重點放在 refreshInvoker 方法上。
3.2.3 刷新 Invoker 列表
接著上一節繼續分析,refreshInvoker 方法是保證 RegistryDirectory 隨註冊中心變化而變化的關鍵所在。這一塊邏輯比較多,接下來一一進行分析。
private void refreshInvoker(List<URL> invokerUrls) {
// invokerUrls 僅有一個元素,且 url 協議頭為 empty,此時表示禁用所有服務
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// 設置 forbidden 為 true
this.forbidden = true;
this.methodInvokerMap = null;
// 銷毀所有 Invoker
destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
// 添加緩存 url 到 invokerUrls 中
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
// 緩存 invokerUrls
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
// 將 url 轉成 Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// 將 newUrlInvokerMap 轉成方法名到 Invoker 列表的映射
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// 轉換出錯,直接打印異常,並返回
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error ..."));
return;
}
// 合並多個組的 Invoker
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
// 保存為本地緩存
this.urlInvokerMap = newUrlInvokerMap;
try {
// 銷毀無用 Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
上面方法的代碼不是很多,但是邏輯卻不少。首先時根據入參 invokerUrls 的數量和協議頭判斷是否禁用所有的服務,如果禁用,則將 forbidden 設為 true,並銷毀所有的 Invoker。若不禁用,則將 url 轉成 Invoker,得到 <url, Invoker> 的映射關系。然後進一步進行轉換,得到 <methodName, Invoker 列表>。之後進行多組 Invoker 合並操作,並將合並結果賦值給 methodInvokerMap。methodInvokerMap 變量在 doList 方法中會被用到,doList 會對該變量進行讀操作,在這裏是寫操作。當新的 Invoker 列表生成後,還要一個重要的工作要做,就是銷毀無用的 Invoker,避免服務消費者調用已下線的服務的服務。
接下裏,我將對上面涉及到的調用進行分析。按照順序,這裏先來分析 url 到 Invoker 的轉換過程。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
// 獲取服務消費端配置的協議
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
// 檢測服務提供者協議是否被服務消費者所支持
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
// 若服務消費者協議頭不被消費者所支持,則忽略當前 providerUrl
continue;
}
}
// 忽略 empty 協議
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 通過 SPI 檢測服務端協議是否被消費端支持
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol..."));
continue;
}
// 合並 url
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
if (keys.contains(key)) {
// 忽略重復 url
continue;
}
keys.add(key);
// 本地 Invoker 緩存列表
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// 緩存未命中
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
// 獲取 disable 配置,並修改 enable 變量
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// 調用 refer 獲取 Invoker
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface...");
}
if (invoker != null) {
// 緩存 Invoker 實例
newUrlInvokerMap.put(key, invoker);
}
} else {
// 緩存命中,將 invoker 存儲到 newUrlInvokerMap 中
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
toInvokers 方法一開始會對服務提供者 url 進行檢測,若服務消費端的配置不支持服務端的協議,或服務端 url 協議頭為 empty 時,toInvokers 均會忽略服務提供方 url。必要的檢測做完後,緊接著是合並 url,然後訪問緩存,嘗試獲取與 url 對應的 invoker。如果緩存命中,直接將 Invoker 存入 newUrlInvokerMap 中即可。如果未命中,則需要新建 Invoker。Invoker 是通過 Protocol 的 refer 方法創建的,這個我在上一篇文章中已經分析過了,這裏就不贅述了。
toInvokers 方法返回的是 <url, Invoker> 映射關系表,接下來還要對這個結果進行進一步處理,得到方法名到 Invoker 列表的映射關系。這個過程由 toMethodInvokers 方法完成,如下:
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
// 方法名 -> Invoker 列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if (invokersMap != null && invokersMap.size() > 0) {
for (Invoker<T> invoker : invokersMap.values()) {
// 獲取 methods 參數
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
if (parameter != null && parameter.length() > 0) {
// 切分 methods 參數值,得到方法名數組
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if (methods != null && methods.length > 0) {
for (String method : methods) {
// 方法名不為 *
if (method != null && method.length() > 0
&& !Constants.ANY_VALUE.equals(method)) {
// 根據方法名獲取 Invoker 列表
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = new ArrayList<Invoker<T>>();
newMethodInvokerMap.put(method, methodInvokers);
}
// 存儲 Invoker 到列表中
methodInvokers.add(invoker);
}
}
}
}
invokersList.add(invoker);
}
}
// 進行服務級別路由,參考:https://github.com/apache/incubator-dubbo/pull/749
List<Invoker<T>> newInvokersList = route(invokersList, null);
// 存儲 <*, newInvokersList> 映射關系
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.isEmpty()) {
methodInvokers = newInvokersList;
}
// 進行方法級別路由
newMethodInvokerMap.put(method, route(methodInvokers, method));
}
}
// 排序,轉成不可變列表
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}
上面方法主要做了三件事情, 第一是對入參進行遍歷,然後獲取 methods 參數,並切分成數組。隨後以方法名為鍵,Invoker 列表為值,將映射關系存儲到 newMethodInvokerMap 中。第二是分別基於類和方法對 Invoker 列表進行路由操作。第三是對 Invoker 列表進行排序,並轉成不可變列表。關於 toMethodInvokers 方法就先分析到這,我們繼續向下分析,這次要分析的多組服務的合並邏輯。
private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) {
Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>();
// 遍歷入參
for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) {
String method = entry.getKey();
List<Invoker<T>> invokers = entry.getValue();
// group -> Invoker 列表
Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>();
// 遍歷 Invoker 列表
for (Invoker<T> invoker : invokers) {
// 獲取分組配置
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "");
List<Invoker<T>> groupInvokers = groupMap.get(group);
if (groupInvokers == null) {
groupInvokers = new ArrayList<Invoker<T>>();
// 緩存 <group, List<Invoker>> 到 groupMap 中
groupMap.put(group, groupInvokers);
}
// 存儲 invoker 到 groupInvokers
groupInvokers.add(invoker);
}
if (groupMap.size() == 1) {
// 如果 groupMap 中僅包含一組鍵值對,此時直接取出該鍵值對的值即可
result.put(method, groupMap.values().iterator().next());
// groupMap 中包含多組鍵值對,比如:
// {
// "dubbo": [invoker1, invoker2, invoker3, ...],
// "hello": [invoker4, invoker5, invoker6, ...]
// }
} else if (groupMap.size() > 1) {
List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
for (List<Invoker<T>> groupList : groupMap.values()) {
// 通過集群類合並每個分組對應的 Invoker 列表
groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
}
// 緩存結果
result.put(method, groupInvokers);
} else {
result.put(method, invokers);
}
}
return result;
}
上面方法首先是生成 group 到 Invoker 類比的映射關系表,若關系表中的映射關系數量大於1,表示有多組服務。此時通過集群類合並每組 Invoker,並將合並結果存儲到 groupInvokers 中。之後將方法名與 groupInvokers 存到到 result 中,並返回,整個邏輯結束。
接下來我們再來看一下 Invoker 列表刷新邏輯的最後一個動作 -- 刪除無用 Invoker。如下:
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
List<String> deleted = null;
if (oldUrlInvokerMap != null) {
// 獲取新生成的 Invoker 列表
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
// 遍歷老的 <url, Invoker> 映射表
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
// 檢測 newInvokers 中是否包含老的 Invoker
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<String>();
}
// 若不包含,則將老的 Invoker 對應的 url 存入 deleted 列表中
deleted.add(entry.getKey());
}
}
}
if (deleted != null) {
// 遍歷 deleted 集合,並到老的 <url, Invoker> 映射關系表查出 Invoker,銷毀之
for (String url : deleted) {
if (url != null) {
// 從 oldUrlInvokerMap 中移除 url 對應的 Invoker
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
try {
// 銷毀 Invoker
invoker.destroy();
} catch (Exception e) {
logger.warn("destroy invoker...");
}
}
}
}
}
}
destroyUnusedInvokers 方法的主要邏輯是通過 newUrlInvokerMap 找出待刪除 Invoker 對應的 url,並將 url 存入到 deleted 列表中。然後再遍歷 deleted 列表,並從 oldUrlInvokerMap 中移除相應的 Invoker,銷毀之。整個邏輯大致如此,不是很難理解。
到此關於 Invoker 列表的刷新邏輯就分析了,這裏對整個過程進行簡單總結。如下:
- 檢測入參是否僅包含一個 url,且 url 協議頭為 empty
- 若第一步檢測結果為 true,表示禁用所有服務,此時銷毀所有的 Invoker
- 若第一步檢測結果為 false,此時將入參轉為 Invoker 列表
- 對將上一步邏輯刪除的結果進行進一步處理,得到方法名到 Invoker 的映射關系表
- 合並多組 Invoker
- 銷毀無用 Invoker
Invoker 的刷新邏輯還是比較復雜的,大家在看的過程中多寫點 demo 進行調試。好了,本節就到這。
4. 總結
本篇文章對 Dubbo 服務目錄進行了較為詳細的分析,篇幅主要集中在 RegistryDirectory 的源碼分析上。分析下來,不由得感嘆,想讓本地服務目錄和註冊中心保持一致還是需要做很多事情的,並不簡單。服務目錄是 Dubbo 集群容錯的一部分,也是比較基礎的部分,所以大家務必搞懂。
好了,本篇文章就先到這了。感謝大家閱讀。
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
作者:田小波
本文同步發布在我的個人博客:http://www.tianxiaobo.com
本作品采用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。
Dubbo 源碼分析 - 集群容錯之 Directory