Dubbo的原理分析
【1】RPC原理
首先看個圖:
一次完整的RPC呼叫流程(同步呼叫,非同步另說)如下:
1)服務消費方(client)呼叫以本地呼叫方式呼叫服務;
2)client stub(可以理解為代理)接收到呼叫後負責將方法、引數等組裝成能夠進行網路傳輸的訊息體;
3)client stub找到服務地址,並將訊息傳送到服務端;
4)server stub收到訊息後進行解碼;
5)server stub根據解碼結果呼叫本地的服務;
6)本地服務執行並將結果返回給server stub;
7)server stub將返回結果打包成訊息併發送至消費方;
8)client stub接收到訊息,並進行解碼;
9)服務消費方得到最終結果。
RPC框架的目標就是要2~8這些步驟都封裝起來,這些細節對使用者來說是透明的,不可見的。
【2】Netty通訊原理
Netty是一個非同步事件驅動的網路應用程式框架, 用於快速開發可維護的高效能協議伺服器和客戶端。它極大地簡化了TCP和UDP套接字伺服器等網路程式設計。
① BIO:(Blocking IO)
② NIO (Non-Blocking IO)
Selector 一般稱 為選擇器 ,也可以翻譯為 多路複用器。
四種狀態:Connect(連線就緒)、Accept(接受就緒)、Read(讀就緒)、Write(寫就緒)。
③ Netty基本原理
如下圖所示:
Netty下載和使用參考官網地址:https://netty.io/。
Netty基礎入門學習參考博文:https://blog.csdn.net/J080624/article/details/84238619。
【3】Dubbo的框架設計
更多詳細內容參考官網地址,如下圖所示:
圖例說明:
- 圖中左邊淡藍背景的為服務消費方使用的介面,右邊淡綠色背景的為服務提供方使用的介面,位於中軸線上的為雙方都用到的介面。
- 圖中從下至上分為十層,各層均為單向依賴,右邊的黑色箭頭代表層之間的依賴關係,每一層都可以剝離上層被複用,其中,Service 和 Config 層為 API,其它各層均為 SPI。
- 圖中綠色小塊的為擴充套件介面,藍色小塊為實現類,圖中只顯示用於關聯各層的實現類。
- 圖中藍色虛線為初始化過程,即啟動時組裝鏈,紅色實線為方法呼叫過程,即執行時調時鏈,紫色三角箭頭為繼承,可以把子類看作父類的同一個節點,線上的文字為呼叫的方法。
各層說明
- config 配置層:對外配置介面,以 ServiceConfig, ReferenceConfig 為中心,可以直接初始化配置類,也可以通過 spring 解析配置生成配置類
- proxy 服務代理層:服務介面透明代理,生成服務的客戶端 Stub 和伺服器端 Skeleton, 以 ServiceProxy 為中心,擴充套件介面為 ProxyFactory
- registry 註冊中心層:封裝服務地址的註冊與發現,以服務 URL 為中心,擴充套件介面為 RegistryFactory, Registry, RegistryService
- cluster 路由層:封裝多個提供者的路由及負載均衡,並橋接註冊中心,以 Invoker 為中心,擴充套件介面為 Cluster, Directory, Router, LoadBalance
- monitor 監控層:RPC 呼叫次數和呼叫時間監控,以 Statistics 為中心,擴充套件介面為 MonitorFactory, Monitor, MonitorService
- protocol 遠端呼叫層:封裝 RPC 呼叫,以 Invocation, Result 為中心,擴充套件介面為 Protocol, Invoker, Exporter
- exchange 資訊交換層:封裝請求響應模式,同步轉非同步,以 Request, Response 為中心,擴充套件介面為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- transport 網路傳輸層:抽象 mina 和 netty 為統一介面,以 Message 為中心,擴充套件介面為 Channel, Transporter, Client, Server, Codec
- serialize 資料序列化層:可複用的一些工具,擴充套件介面為 Serialization, ObjectInput, ObjectOutput, ThreadPool
關係說明
- 在 RPC 中,Protocol 是核心層,也就是隻要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 呼叫,然後在 Invoker 的主過程上 Filter 攔截點。
- 圖中的 Consumer 和 Provider 是抽象概念,只是想讓看圖者更直觀的瞭解哪些類分屬於客戶端與伺服器端,不用 Client 和 Server 的原因是 Dubbo 在很多場景下都使用 Provider, Consumer, Registry, Monitor 劃分邏輯拓普節點,保持統一概念。
- 而 Cluster 是外圍概念,所以 Cluster 的目的是將多個 Invoker 偽裝成一個 Invoker,這樣其它人只要關注 Protocol 層 Invoker 即可,加上 Cluster 或者去掉 Cluster 對其它層都不會造成影響,因為只有一個提供者時,是不需要 Cluster 的。
- Proxy 層封裝了所有介面的透明化代理,而在其它層都以 Invoker 為中心,只有到了暴露給使用者使用時,才用 Proxy 將 Invoker 轉成介面,或將介面實現轉成 Invoker,也就是去掉 Proxy 層 RPC 是可以 Run 的,只是不那麼透明,不那麼看起來像調本地服務一樣調遠端服務。
- 而 Remoting 實現是 Dubbo 協議的實現,如果你選擇 RMI 協議,整個 Remoting 都不會用上,Remoting 內部再劃為 Transport 傳輸層和 Exchange 資訊交換層,Transport 層只負責單向訊息傳輸,是對 Mina, Netty, Grizzly 的抽象,它也可以擴充套件 UDP 傳輸,而 Exchange 層是在傳輸層之上封裝了 Request-Response 語義。
- Registry 和 Monitor 實際上不算一層,而是一個獨立的節點,只是為了全域性概覽,用層的方式畫在一起。
模組說明:
- dubbo-common 公共邏輯模組:包括 Util 類和通用模型。
- dubbo-remoting 遠端通訊模組:相當於 Dubbo 協議的實現,如果 RPC 用 RMI協議則不需要使用此包。
- dubbo-rpc 遠端呼叫模組:抽象各種協議,以及動態代理,只包含一對一的呼叫,不關心叢集的管理。
- dubbo-cluster 叢集模組:將多個服務提供方偽裝為一個提供方,包括:負載均衡, 容錯,路由等,叢集的地址列表可以是靜態配置的,也可以是由註冊中心下發。
- dubbo-registry 註冊中心模組:基於註冊中心下發地址的叢集方式,以及對各種註冊中心的抽象。
- dubbo-monitor 監控模組:統計服務呼叫次數,呼叫時間的,呼叫鏈跟蹤的服務。
- dubbo-config 配置模組:是 Dubbo 對外的 API,使用者通過 Config 使用D ubbo,隱藏 Dubbo 所有細節。
- dubbo-container 容器模組:是一個 Standlone 的容器,以簡單的 Main 載入 Spring 啟動,因為服務通常不需要 Tomcat/JBoss 等 Web 容器的特性,沒必要用 Web 容器去載入服務。
整體上按照分層結構進行分包,與分層的不同點在於:
- container 為服務容器,用於部署執行服務,沒有在層中畫出。
- protocol 層和 proxy 層都放在 rpc 模組中,這兩層是 rpc 的核心,在不需要叢集也就是隻有一個提供者時,可以只使用這兩層完成 rpc 呼叫。
- transport 層和 exchange 層都放在 remoting 模組中,為 rpc 呼叫的通訊基礎。
- serialize 層放在 common 模組中,以便更大程度複用。
依賴關係
- 圖中小方塊 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表層或模組,藍色的表示與業務有互動,綠色的表示只對 Dubbo 內部互動。
- 圖中背景方塊 Consumer, Provider, Registry, Monitor 代表部署邏輯拓撲節點。
- 圖中藍色虛線為初始化時呼叫,紅色虛線為執行時非同步呼叫,紅色實線為執行時同步呼叫。
- 圖中只包含 RPC 的層,不包含 Remoting 的層,Remoting 整體都隱含在 Protocol 中。
【4】解析標籤並載入配置
如下所示,我們在spring的xml中配置了dubbo標籤:
<dubbo:application name="order-service-consumer"></dubbo:application>
<dubbo:registry address="zookeeper://127.0.0.1:2181"></dubbo:registry>
那麼Dubbo是如何拿到配置資訊呢?這就又要提到一個重要介面BeanDefinitionParser!
如下圖所示,該介面有諸多實現類為專案提供支援:
如AnnotationConfigBeanDefinitionParser是解析<context:annotation-config/>
標籤的,org.springframework.web.servlet.config.AnnotationDrivenBeanDefinitionParser是解析<mvc:annotation-driven/>
標籤的,ComponentScanBeanDefinitionParser是解析<context:component-scan/>
標籤的(參考三個標籤使用詳解)。同樣這裡DubboBeanDefinitionParser就是為Dubbo標籤提供支援的。
該類整體結構如下所示,其中parse方法是解析標籤的核心:
在parse方法中,每次標籤解析都對應一個beanClass,如MethodConfig。這個類從哪裡來,需要檢視DubboNamespaceHandler。
其原始碼如下:
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
可以發現在啟init()方法中,註冊了一系列beanClass不同的DubboBeanDefinitionParser。
其實解析標籤過程就是將每個標籤的具體設定資訊儲存到每個標籤對應的beanClass對應的物件中,如MethodConfig。
不過這裡需要注意幾個特殊的地方,ServiceBean、ReferenceBean和AnnotationBeanDefinitionParser。並不再是XXXConfig。尤其是ServiceBean涉及到服務暴露,ReferenceBean涉及到服務引用。
【5】Dubbo中的服務暴露
首先看一下ServiceBean的原始碼:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean,
DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>,
BeanNameAware {
//...
}
其實現的InitializingBean是在物件屬性設定完之後進行操作,DisposableBean是在物件銷燬前進行操作(這兩個Bean不熟悉的可以參考本篇部落格),ApplicationContextAware用來獲取應用上下文。尤其需要需要的是其實現了ApplicationListener<ContextRefreshedEvent>
,會在容器初始化之後呼叫其onApplicationEvent()方法(參考本篇部落格)。
也就是說,在ServiceBean物件建立並設定屬性後,會呼叫afterPropertiesSet()方法;在Spring容器初始化後會呼叫onApplicationEvent()方法。
① afterPropertiesSet()方法如下:
public void afterPropertiesSet() throws Exception {
if (getProvider() == null) {
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault().booleanValue()) {
providerConfigs.add(config);
}
}
if (!providerConfigs.isEmpty()) {
setProviders(providerConfigs);
}
} else {
ProviderConfig providerConfig = null;
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (providerConfig != null) {
throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
}
providerConfig = config;
}
}
if (providerConfig != null) {
setProvider(providerConfig);
}
}
}
}
if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
if (getModule() == null
&& (getProvider() == null || getProvider().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
if ((getRegistries() == null || getRegistries().isEmpty())
&& (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().isEmpty())
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0) {
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
for (RegistryConfig config : registryConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
registryConfigs.add(config);
}
}
if (registryConfigs != null && !registryConfigs.isEmpty()) {
super.setRegistries(registryConfigs);
}
}
}
if (getMonitor() == null
&& (getProvider() == null || getProvider().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
if ((getProtocols() == null || getProtocols().isEmpty())
&& (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().isEmpty())) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
for (ProtocolConfig config : protocolConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
protocolConfigs.add(config);
}
}
if (protocolConfigs != null && !protocolConfigs.isEmpty()) {
super.setProtocols(protocolConfigs);
}
}
}
if (getPath() == null || getPath().length() == 0) {
if (beanName != null && beanName.length() > 0
&& getInterface() != null && getInterface().length() > 0
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
if (!isDelay()) {
export();
}
}
這個方法做了什麼呢?可以看到裡面有很多諸如setProvider(providerConfig);,setApplication(applicationConfig);, setModule(moduleConfig);等語句,就是將獲取到的配置資訊儲存在ServiceBean中。
方法末尾會判斷是否延遲,否則將會呼叫export()方法暴露服務:
if (!isDelay()) {
export();
}
② onApplicationEvent方法如下:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
判斷如果是延遲的&沒有被暴露&應該被暴露的,就會呼叫 export();方法暴露服務。
③ export()方法原始碼如下:
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
//前面都是獲取、判斷操作,下面才是關鍵
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
④ doExport()方法原始碼如下:
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
checkDefault();
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) {
protocols = provider.getProtocols();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
checkRef();
generic = Boolean.FALSE.toString();
}
if (local != null) {
//本地代理
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
//本地助手
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
//這裡是關鍵
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
⑤ doExportUrls方法原始碼如下:
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
//載入註冊中心的資訊
List<URL> registryURLs = loadRegistries(true);
//在某個埠使用某種協議暴露,如dubbo 20882
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
核心方法為doExportUrlsFor1Protocol,服務暴露就是在該方法中完成的。
⑥ doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//這裡省略諸多程式碼
//...
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 這裡,通過proxyFactory拿到Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//對invoker進行了包裝
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//核心方法
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
該方法中核心語句 Exporter<?> exporter = protocol.export(wrapperInvoker);進行遠端服務暴露。其會先執行RegistryProtocol的<T> Exporter<T> export(final Invoker<T> originInvoker)
方法,再執行DubboProtocol的<T> Exporter<T> export(Invoker<T> invoker)
方法。
RegistryProtocol的<T> Exporter<T> export(final Invoker<T> originInvoker)
方法如下所示:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 暴露服務
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
//登錄檔註冊provider--沒錯在另外方法將會註冊消費者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
⑦ DubboProtocol的<T> Exporter<T> export(Invoker<T> invoker)
方法原始碼如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//這裡是關鍵,啟動Netty伺服器,監聽dubbo埠,如20882
openServer(url);
optimizeSerialization(url);
return exporter;
}
⑧ 完整示意圖如下:
⑨ 時序圖
【6】Dubbo中的服務引用
ReferenceBean原始碼如下:
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
//...
}
其同樣實現了ApplicationContextAware, InitializingBean, DisposableBean之外,還實現了FactoryBean(參考本篇博文)。所以說,獲取ReferenceBean例項就需要呼叫其getObject()方法。
① getObject()
@Override
public Object getObject() throws Exception {
return get();
}
而get()方法如下:
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
//這裡是關鍵
init();
}
return ref;
}
② init()
private void init() {
if (initialized) {
return;
}
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// get consumer's global configuration
checkDefault();
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
}
}
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if (null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
resolve = properties.getProperty(interfaceName);
}
}
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null && resolveFile.length() > 0) {
logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
}
}
}
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
checkApplication();
checkStubAndMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prefix = StringUtils.getServiceKey(map);
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prefix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);
// 這裡是關鍵! 建立代理!
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
③ createProxy()
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 這裡是關鍵,這裡拿到引用的Invoker 參考 (4)
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 這裡是關鍵!!!使用代理工廠獲取Invoker代理!!
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
④ refer
這裡仍舊呼叫RegistryProtocol的refer方法和DubboProtocol方法。
RegistryProtocol呼叫方法如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private Cluster getMergeableCluster() {
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//這裡訂閱服務
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
//登錄檔註冊消費者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
其中在訂閱服務時會來到DubboProtocol的refer方法:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.--這裡很關鍵 將會建立NettyClient
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
⑤ 完成流程示意圖
⑥ 時序圖
【7】Dubbo中的服務呼叫
如下圖所示:
其實Dubbo服務呼叫的核心就是DubboInvoker的doInvoke方法,底層使用Netty進行通訊。
實現細節參考官方文件:http://dubbo.apache.org/zh-cn/docs/dev/implementation.html。