Dubbo之服務消費原理
前言
上篇文章《Dubbo之服務暴露》分析 Dubbo 服務是如何暴露的,本文接著分析 Dubbo 服務的消費流程。主要從以下幾個方面進行分析:註冊中心的暴露;通過註冊中心進行服務消費通知;直連服務進行消費。
服務消費端啟動時,將自身的資訊註冊到註冊中心的目錄,同時還訂閱服務提供方的目錄,當服務提供方的 URL 發生更改時,實時獲取新的資料。
服務消費端流程
下面是一個服務消費的流程圖:
上圖中可以看到,服務消費的流程與服務暴露的流程有點類似逆向的。同樣,Dubbo 服務也是分為兩個大步驟:第一步就是將遠端服務通過Protocol
轉換成Invoker
(概念在上篇文章中有解釋)。第二步通過動態代理將Invoker
org.apache.dubbo.config.ReferenceConfig
類是ReferenceBean
的父類,與生產端服務的ServiceBean
一樣,存放著解析出來的 XML 和註解資訊。類關係如下:
服務初始化中轉換的入口
當我們消費端呼叫本地介面就能實現遠端服務的呼叫,這是怎麼實現的呢?根據上面的流程圖,來分析消費原理。
在消費端進行初始化時ReferenceConfig#init
,會執行ReferenceConfig#createProxy
來完成這一系列操作。以下為ReferenceConfig#createProxy
主要的程式碼部分:
private T createProxy(Map<String, String> map) { // 判斷是否為 Jvm 本地引用 if (shouldJvmRefer(map)) { // 通過 injvm 協議,獲取本地服務 URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url); } else { urls.clear(); // 判斷是否有自定義的直連地址,或註冊中心地址 if (url != null && url.length() > 0) { String[] us = SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName); } if (UrlUtils.isRegistry(url)) { // 如果是註冊中心Protocol型別,則向地址中新增 refer 服務消費元資料 urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { // 直連服務提供端 urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 組裝註冊中心的配置 if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { // 檢查配置中心 checkRegistry(); List<URL> us = ConfigValidationUtils.loadRegistries(this, false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u); if (monitorUrl != null) { // 監控上報資訊 map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 註冊中心地址新增 refer 服務消費元資料 urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } } } // 只有一條註冊中心資料,即單註冊中心 if (urls.size() == 1) { // 將遠端服務轉化成 Invoker invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { // 因為多註冊中心就會存在多個 Invoker,這裡用儲存在 List 中 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { // 將每個註冊中心轉換成 Invoker 資料 invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (UrlUtils.isRegistry(url)) { // 會覆蓋前遍歷的註冊中心,使用最後一條註冊中心資料 registryURL = url; } } if (registryURL != null) { // 預設使用 zone-aware 策略來處理多個訂閱 URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME); // 將轉換後的多個 Invoker 合併成一個 invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } // 利用動態代理,將 Invoker 轉換成本地介面代理 return (T) PROXY_FACTORY.getProxy(invoker); }
上面轉換的過程中,主要可概括為:先分為本地引用和遠端引用兩類。本地就是以 inJvm 協議的獲取本地服務,這不做過多說明;遠端引用分為直連服務和通過註冊中心。註冊中心分為單註冊中心和多註冊中心的情況,單註冊中心好解決,直接使用即可,多註冊中心時,將轉換後的 Invoker 合併成一個 Invoker。最後通過動態代理將 Invoker 轉換成本地介面代理。
獲取 Invoker 例項
由於本地服務時直接從快取中獲取,這裡就註冊中心的消費進行分析,上面程式碼片段中使用的是REF_PROTOCOL.refer
進行轉換,該方法程式碼:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 獲取服務的註冊中心url,裡面會設定註冊中心的協議和移除 registry 的引數 url = getRegistryUrl(url); // 獲取註冊中心例項 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 獲取服務消費元資料 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); // 從服務消費元資料中獲取分組資訊 String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 執行 Invoker 轉換工作 return doRefer(getMergeableCluster(), registry, type, url); } } // 執行 Invoker 轉換工作 return doRefer(cluster, registry, type, url); }
上面主要是獲取服務消費的註冊中心例項和進行服務分組,最後呼叫doRefer
方法進行轉換工作,以下為doRefer
的程式碼:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 建立 RegistryDirectory 物件
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設定註冊中心
directory.setRegistry(registry);
// 設定協議
directory.setProtocol(protocol);
// directory.getUrl().getParameters() 是服務消費元資料
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 消費訊息註冊到註冊中心
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 服務消費者訂閱:服務提供端,動態配置,路由的通知
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 多個Invoker合併為一個
Invoker invoker = cluster.join(directory);
return invoker;
}
上面實現主要是完成建立 RegistryDirectory 物件,將消費服務元資料註冊到註冊中心,通過 RegistryDirectory 物件裡的資訊,實現服務提供端,動態配置及路由的訂閱相關功能。
RegistryDirectory 這個類實現了 NotifyListener 這個通知監聽介面,當訂閱的服務,配置或路由發生變化時,會接收到通知,進行相應改變:
public synchronized void notify(List<URL> urls) {
// 將服務提供方配置,路由配置,服務提供方的服務分別以不同的 key 儲存在 Map 中
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
// 更新服務提供方配置
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 更新路由配置
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 載入服務提供方的服務資訊
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getUrl(),this);
}
}
// 重新載入 Invoker 例項
refreshOverrideAndInvoker(providerURLs);
}
RegistryDirectory#notify
裡面最後會重新整理 Invoker 進行重新載入,下面是核心程式碼的實現:
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
// 重新整理 invoker
refreshInvoker(urls);
}
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
......
} else {
// 重新整理之前的 Invoker
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
// 載入新的 Invoker Map
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 獲取新的 Invokers
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// 快取新的 Invokers
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 通過新舊 Invokers 對比,銷燬無用的 Invokers
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
獲取重新整理前後的 Invokers,將新的 Invokers 重新快取起來,通過對比,銷燬無用的 Invoker。
上面將 URL 轉換 Invoker 是在RegistryDirectory#toInvokers
中進行。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// 過濾消費端不匹配的協議,及非法協議
......
// 合併服務提供端配置資料
URL url = mergeUrl(providerUrl);
// 過濾重複的服務提供端配置資料
String key = url.toFullString();
if (keys.contains(key)) {
continue;
}
keys.add(key);
// 快取鍵是不與使用者端引數合併的url,無論使用者如何合併引數,如果伺服器url更改,則再次引用
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// 快取無對應 invoker,再次呼叫 protocol#refer 是否有資料
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
// 將新的 Invoker 快取起來
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
// 快取裡有資料,則進行重新覆蓋
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
總結
通過《Dubbo之服務暴露》和本文兩篇文章對 Dubbo 服務暴露和服務消費原理的瞭解。我們可以看到,不管是暴露還是消費,Dubbo 都是以 Invoker 為資料交換主體進行,通過對 Invoker 發起呼叫,實現一個遠端或本地的實現。
個人部落格: https://ytao.top
關注公眾號 【ytao】,更多原創好文
相關推薦
Dubbo之服務消費原理
前言 上篇文章《Dubbo之服務暴露》分析 Dubbo 服務是如何暴露的,本文接著分析 Dubbo 服務的消費流程。主要從以下幾個方面進行分析:註冊中心的暴露;通過註冊中心進行服務消費通知;直連服務進行消費。 服務消費端啟動時,將自身的資訊註冊到註冊中心的目錄,同時還訂閱服務提供方的目錄,當服務提供方的
dubbo之服務容器
jetty 命令 classpath spring cto 日誌級別 web nts 靜態 服務容器是一個standalone的啟動程序,因為後臺服務不需要Tomcat或JBoss等Web容器的功能,如果硬要用Web容器去加載服務提供方,增加復雜性,也浪費資源。 服務
SpringCloud微服務雲架構構建B2B2C電子商務平臺分析之-服務消費(Ribbon)
Spring Cloud Ribbon Spring Cloud Ribbon是基於Netflix Ribbon實現的一套客戶端負載均衡的工具。它是一個基於HTTP和TCP的客戶端負載均衡器。它可以通過在客戶端中配置ribbonServerList來設定服務端列表去輪詢訪問以達到均衡負載的作用。
Dubbo原始碼分析(三):Dubbo之服務端(Service)
如上圖所示的Dubbo的暴露服務的過程,不難看出它也和消費者端很像,也需要一個像reference的物件來維護service關聯的所有物件及其屬性,這裡的reference就是provider。由於ServiceBean實現了 Initializ
Dubbo剖析-服務消費端泛化呼叫
一、前言 前面我們講解基於Spring和基於dubbo api方式搭建一個簡單的分散式系統時候服務消費端是引入了一個sdk的,這個SDK是個二方包,裡面存放了服務提供端提供的所有介面類以及介面使用的入參和出參的pojo類,服務消費端則使用JDK代理對介面進行代理。 泛化介面呼叫方式主要用於服務
Dubbo剖析-服務消費端非同步呼叫
一、前言 前面我們講解的無論是正常呼叫還是泛化呼叫也好,都是進行同步呼叫的,也就是服務消費方發起一個遠端呼叫後,呼叫執行緒要被阻塞掛起,直到服務提供方返回。本節來講解下非同步呼叫,非同步呼叫是指服務消費方發起一個遠端呼叫後,不等服務提供方返回結果,呼叫方法就返回了,也就是當前執行緒不會被阻塞,這就允許呼叫方
Dubbo剖析-服務消費方遠端服務到Invoker的轉換
一、前言 前面dubbo整體架構分析裡面我們講解了服務消費者消費一個服務的詳細過程是,首先 呼叫 Protocol 的 refer 方法生成 Invoker 例項,接下來把Invoker 轉換為客戶端需要的介面(如:UserServiceBo),本文來講解第一個環節的實現 imag
第三篇:SpringCloud之服務消費(Feign)
上一篇文章,講述瞭如何通過RestTemplate+Ribbon去消費服務,這篇文章主要講述如何通過Feign去消費服務。 Feign簡介 Feign是一種宣告式、模板化的HTTP客戶端,它使得寫Http客戶端變得更簡單。使用Feign,只需要建立一個介面並註解。它具有可
第二篇:SpringCloud之服務消費(Ribbon)
在微服務架構中,業務都會被拆分成一個獨立的服務,服務與服務的通訊是基於Http RESTful的。SpringCloud有兩種服務呼叫方式,一種是Ribbon+RESTTemplate,另一種是Feign。在這一篇文章首先講解下基於Ribbon+REST。 Ribbon簡介
深入理解dubbo之服務引用
在之前說了一下dubbo的服務釋出過程,其實嚴格意義上只說了一半吧,只把dubbo如何經過ProxyFactory的代理成一個Invoker,等待客戶端呼叫的過程講了一遍,而重要的Protocol.export方法略過去了,今天我將連帶dubbo的comsume
Dubbo之服務分組、分組聚合。
服務分組 當一個介面有多種實現時,可以用group區分。 <dubbo:service group="feedback" interface="com.xxx.IndexService" /> <dubbo:service group="
Dubbo之服務暴露
![](https://img2020.cnblogs.com/other/1850167/202003/1850167-20200321113334029-1603461641.png) # 前言 本文 Dubbo 使用版本`2.7.5` Dubbo 通過使用`dubbo:service`配置或`@ser
Dubbo原理和原始碼解析之服務暴露
github新增倉庫 "dubbo-read"(點此檢視),集合所有《Dubbo原理和原始碼解析》系列文章,後續將繼續補充該系列,同時將針對Dubbo所做的功能擴充套件也進行分享。不定期更新,歡迎Follow。 一、框架設計 在官方《Dubbo 使用者指南》架構部分,給出了服務呼叫的整體架構和
Dubbo原理和原始碼解析之服務引用
github新增倉庫 "dubbo-read"(點此檢視),集合所有《Dubbo原理和原始碼解析》系列文章,後續將繼續補充該系列,同時將針對Dubbo所做的功能擴充套件也進行分享。不定期更新,歡迎Follow。 一、框架設計 在官方《Dubbo 開發指南》框架設計部分,給出了引用服務時序圖:
dubbo組成原理-http服務消費端如何呼叫
dubbo協議已經用的很多了,這裡來稍微介紹一下http協議,官方對http協議的說明簡直少的讓人髮指。哈哈 百度大部分都只是講了http服務端的配置 那就先從服務端的配置說起 dubbo需要的jar包這裡就不說明了,網上找些maven的pom就可以 web.xml配置se
Dubbo之——消費端直連服務提供者(開發除錯)
在生產環境使用情況是,服務消費端只消費指定Provider提供者的服務 開發除錯 我們啟動遠端服務提供者 我啟動web-boss,這裡呼叫是遠端提供者服務 檢視user-service,provider方的配置 配置consumer方呼叫本地dubbo服務,進行
[dubbo 原始碼之 ]2. 服務消費方如何啟動服務
啟動流程 消費者在啟動之後,會通過ReferenceConfig#get()來生成遠端呼叫代理類。在get方法中,會啟動一系列呼叫函式,我們來一個個解析。 配置同樣包含2種: XML <?xml version="1.0" encoding="UTF-8"?&
Dubbo學習筆記10:Dubbo服務消費方啟動流程源碼分析
exec checked 自己 當前 In rpc mod png collect 同理我們看下服務消費端啟動流程時序圖: 在《Dubbo整體架構分析》一文中,我們提到服務消費方需要使用ReferenceConfig API來消費服務,具體是調用代碼(1)get()方法來
dubbo源碼分析 之 服務本地暴露
ice oca IE exec 規則 PE nbsp dpa tzu dubbo 在服務暴露發生了哪些事,今天我們就來分析一下整個服務暴露中的本地暴露。本地暴露需要服務提供方與服務消費方在同一個 JVM。下面我們來寫一個本地暴露使用的例子: 1 DemoService.j
Dubbo原始碼解析之服務端接收訊息
準備 dubbo 版本:2.5.4 服務端接收訊息流程 Handler鏈路 DubboProtocol private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent("c