Dubbo原理和原始碼解析之服務引用
github新增倉庫 "dubbo-read"(點此檢視),集合所有《Dubbo原理和原始碼解析》系列文章,後續將繼續補充該系列,同時將針對Dubbo所做的功能擴充套件也進行分享。不定期更新,歡迎Follow。
一、框架設計
在官方《Dubbo 開發指南》框架設計部分,給出了引用服務時序圖:
另外,在官方《Dubbo 使用者指南》叢集容錯部分,給出了服務引用的各功能元件關係圖:
本文將根據以上兩張圖,分析服務引用的實現原理,並進行詳細的程式碼跟蹤與解析。
二、原理和原始碼解析
2.1 建立代理
Dubbo 基於 Spring 的 Schema 擴充套件實現 XML 配置解析,DubboNamespaceHandler 會將 <dubbo:reference> 標籤解析為 ReferenceBean,ReferenceBean 實現了 FactoryBean,因此當它在程式碼中有引用時,會呼叫 ReferenceBean#getObject() 方法進入節點註冊和服務發現流程。
ReferenceBean.java
public Object getObject() throws Exception { return get(); }
ReferenceConfig.java
public synchronized T get() { if (destroyed){ throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; }private void init() { //.......忽略 ref = createProxy(map); } private T createProxy(Map<String, String> map) { //.....忽略 invoker = refprotocol.refer(interfaceClass, urls.get(0)); //.....忽略 // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }
2.2 服務發現
因為通過註冊中心,因此在 ReferenceConfig.java#createProxy() 方法中,進入 RegistryProtocol.java#refer() 方法。
RegistryProtocol.java
private Cluster cluster; public void setCluster(Cluster cluster) { this.cluster = cluster; } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory); }
RegistryDirectory 通過 RegistryDirectory#subscribeUrl() 向 Zookeeper 訂閱服務節點資訊並 watch 變更,這樣就實現了服務自動發現。
2.3 Invoker選取
2.3.1 Cluster
上面我之所以把設定 Cluster 的程式碼貼上,是因為此處涉及到一個 Dubbo 服務框架核心的概念——微核心和外掛機制(此處會單獨一篇文章詳細介紹):
有關 Dubbo 的設計原則,請檢視Dubbo《一些設計上的基本常識》。
Cluster 類的定義如下:
Cluster.java
@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
cluster 的型別是 Cluster$Adaptive,實際上是一個通用的代理類,它會根據 URL 中的 cluster 引數值定位到實際的 Cluster 實現類(預設是 FailoverCluster)。 由於 ExtensionLoader 在例項化物件時,會在例項化完成之後自動套上 Wrapper 類,而 MockerClusterWrapper 就是這樣一個 Wrapper。
MockerClusterWrapper.java
public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster) { this.cluster = cluster; } public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } }
也就是說,例項化出來的 FailoverCluster 會作為引數賦予 MockerClusterWrapper#cluster,而 MockClusterWrapper 會作為引數賦予 RegistryProtocol#cluster。因此 RegistryProtocol#doRefer() 中呼叫 cluster.join(directory) 實際上是呼叫的 MockClusterWrapper#join(directory)。 使用這種機制,可以把一些公共的處理放在 Wrapper 類中,實現程式碼和功能收斂。
MockClusterInvoker.java
public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")){ //no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); }catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } //fail:mock result = doMockInvoke(invocation, e); } } } return result; }
這裡還涉及到 Dubbo 另外一個核心機制——Mock。Mock 可以在測試中模擬服務呼叫的各種異常情況,還用來實現服務降級。 從 MockClusterWrapper.join() 方法可知,實際建立的 ClusterInvoker 是封裝了 FailoverClusterInvoker 的 MockerClusterInvoker。
在 MockerClusterInvoker 中,呼叫之前 Dubbo 會先檢查 URL 中是否有 mock 引數(通過服務治理後臺 Consumer 端的遮蔽和容錯進行設定,或者直接動態設定 mock 引數值),如果存在且以 force 開頭,則不發起遠端呼叫直接執行降級邏輯;如果存在且以 fail 開頭,則在遠端呼叫異常時才會執行降級邏輯。
因此,通過 MockerClusterWrapper 成功地在 Invoker 中植入了 Mock 機制。
2.3.2 Directory
在 RegistryProtocol#doRefer() 中可以看到,服務發現過程是通過 RegistryDirectory 向 Zookeeper 訂閱來實現的。 先看看 Directory 類之間的關係:
看下 Directory 介面的定義:
Directory.java
public interface Directory<T> extends Node { Class<T> getInterface(); List<Invoker<T>> list(Invocation invocation) throws RpcException; }
Directory 可以看做是對應 Interface 的 Invoker 列表,而這個列表可能是動態變化的,比如註冊中心推送變更。
通過 ReferenceConfig#createProxy() 方法可知,StaticDirectory 主要用於多註冊中心引用的場景,它的 invoker 列表是通過引數傳入的、固定的。在此不做更詳細的解析了。
RegistryDirectory 用於使用單註冊中心發現服務的場景。RegistryDirectory 沒有重寫 list() 方法,所以使用 AbstractDirectory#list() 方法:
AbstractDirectory.java
public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } List<Invoker<T>> invokers = doList(invocation); List<Router> localRouters = this.routers; // local reference if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; }
RegistryDirectory.java
/** * 獲取 invoker 列表 */ public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " + NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist)."); } List<Invoker<T>> invokers = null; Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; //本地快取 if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { String methodName = RpcUtils.getMethodName(invocation); //根據方法名從本地快取中獲取invoker列表,此處略 //…… } return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; } /** * 節點變更通知 */ public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && configuratorUrls.size() >0 ){ this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && routerUrls.size() >0 ){ List<Router> routers = toRouters(routerUrls); if(routers != null){ // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // 合併override引數 this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); } /** * 根據invokerURL列表轉換為invoker列表 */ private void refreshInvoker(List<URL> invokerUrls){ //...... Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表 //...... } /** * 合併url引數 順序為override > -D >Consumer > Provider */ private Map<String, Invoker<T>> toInvokers(List<URL> urls) { //...... URL url = mergeUrl(providerUrl); //...... }
在 dolist() 方法中,如果通過服務治理禁止 Consumer 訪問的話,此處直接丟擲響應的異常。
RegistryDirectory 實現了 NotifyListener,在 ZK 節點變化時能收到通知更新記憶體快取,其中 RegistryDirectory#mergeUrl() 方法中會按照優先順序合併引數(動態配置在此處生效)。
服務引用時從記憶體快取中獲取並返回invoker列表,並根據路由規則再進行一次過濾。
2.3.3 Router
Router 的作用就是從 Directory 的 invoker 列表中刷選出符合路由規則的 invoker 子集。目前 Dubbo 提供了基於IP、應用名和協議等的靜態路由功能,功能和實現比較簡單,在此不做過多解釋。