Dubbo原始碼解析之服務釋出與註冊
準備
dubbo版本:2.5.4
Spring自定義擴充套件
dubbo
是基於 spring
配置來實現服務釋出,並基於 spring
的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring
中提供了 NamespaceHandler
、BeanDefinitionParser
兩個類用於實現擴充套件
-
NamespaceHandler
:註冊一系列BeanDefinitionParser
,利用他們進行解析 -
BeanDefinitionParser
:用於解析每個element
內容 -
Spring
預設會載入jar包下的META-INF/spring.handlers
NamespaceHandler
(Dubbo-config
模組下dubbo-config-spring
)。
檢視 DubboNamespaceHandler
類
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application" , new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser ("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
通過 DubboBeanDefinitionParser
將 dubbo
自定義標籤解析為相應 Bean
物件。同時為了 spring
啟動時,相應地啟動 provider
釋出服務註冊服務的過程,以及讓客戶端在啟動時自動訂閱發現服務,添加了兩個 bean
,ServiceBean
及 ReferenceBean
,分別繼承 ServiceConfig
和 ReferenceConfig
。
ServiceBean
實現了InitializingBean
、DisposableBean
、ApplicationContextAware
、ApplicationListener
、BeanNameAware
- InitializingBean:
bean
初始化過程中會呼叫afterPropertiesSet
方法 - DisposableBean:
bean
銷燬時,spring
容器會自動執行destory
方法,比如釋放資源 - ApplicationContextAware:當
spring
容器初始化時會自動將ApplicationContext
注入進來 - ApplicationListener:
ApplicationEvent
事件監聽,spring
容器啟動後會發一個事件通知 - BeanNameAware:獲得自身初始化時
bean
的id
屬性
服務釋出過程
先上時序圖,幫助跟蹤過程走向。
serviceBean
是服務釋出入口,在 bean
初始化時會執行其 afterPropertiesSet
方法,在該方法最後會執行 export
方法進行釋出
// ServiceConfig
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && ! export.booleanValue()) {
return;
}
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}
接著呼叫 doExportUrls
方法
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);// 是否獲得註冊中心配置
for (ProtocolConfig protocolConfig : protocols) {
// 執行協議釋出
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/**
* 只保留核心程式碼
*/
String scope = url.getParameter(Constants.SCOPE_KEY);
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// registryURLs
// registrys://...
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 通過proxyFactory獲取Invoker物件
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 註冊服務
// protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// 這裡的protocol是Protocol$Adpative介面卡物件
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
會呼叫動態生成的介面卡類Protocol$Adpative
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
// 獲取指定名稱的擴充套件點
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
getExtension
public T getExtension(String name) {
if (name != null && name.length() != 0) {
if ("true".equals(name)) {
return this.getDefaultExtension();
} else {
Holder<Object> holder = (Holder)this.cachedInstances.get(name);
if (holder == null) {
this.cachedInstances.putIfAbsent(name, new Holder());
holder = (Holder)this.cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized(holder) {
instance = holder.get();
if (instance == null) {
// 建立指定名稱的擴張點>>
instance = this.createExtension(name);
holder.set(instance);
}
}
}
return instance;
}
} else {
throw new IllegalArgumentException("Extension name == null");
}
}
createExtension
private T createExtension(String name) {
Class<?> clazz = (Class)this.getExtensionClasses().get(name);
if (clazz == null) {
throw this.findException(name);
} else {
try {
T instance = EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = EXTENSION_INSTANCES.get(clazz);
}
// 獲取例項進行依賴注入
this.injectExtension(instance);
// cachedWrapperClasses在loadFile中設定
Set<Class<?>> wrapperClasses = this.cachedWrapperClasses;
Class wrapperClass;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
// 獲取帶有指定型別引數的構造方法建立例項,進行依賴注入
for(Iterator var5 = wrapperClasses.iterator(); var5.hasNext(); instance = this.injectExtension(wrapperClass.getConstructor(this.type).newInstance(instance))) {
wrapperClass = (Class)var5.next();
}
}
return instance;
} catch (Throwable var7) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + this.type + ") could not be instantiated: " + var7.getMessage(), var7);
}
}
}
上面方法主要步驟
-
根據name獲取對應class
-
根據class建立例項
-
獲取例項進行依賴注入
-
獲取帶有指定型別引數的構造方法建立例項,進行依賴注入
以Protocol為例:
在dubbo-rpc-api的resources路徑下,可以找到com.alibaba.dubbo.rcp.Protocol檔案中有存在filter/listener
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
而ProtocolFilterWrapper、ProtocolListenerWrapper都存在以Protocol為引數的構造方法
public ProtocolFilterWrapper(Protocol protocol){}
public ProtocolListenerWrapper(Protocol protocol){}
遍歷cachedWrapperClass對DubboProtocol 進行包裝,會通過ProtocolFilterWrapper、ProtocolListenerWrapper進行包裝 。
因為在方法doExportUrlsFor1Protocol中傳入的是registryURL,因此Protocol$Adpative的export方法最終會呼叫RegistryProtocol的export方法
RegistryProtocol
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
// 執行本地釋出>>
final RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
final Registry registry = this.getRegistry(originInvoker);
final URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
final URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
final RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl);
this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable var4) {
RegistryProtocol.logger.warn(var4.getMessage(), var4);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable var3) {
RegistryProtocol.logger.warn(var3.getMessage(