Dubbo原始碼解析之客戶端初始化及服務呼叫
阿新 • • 發佈:2019-02-16
準備
dubbo
版本:2.5.4
客戶端初始化過程
初始化過程
先上時序圖,幫助理解客戶端初始化過程。
ReferenceBean
是客戶端初始化入口,其實現 InitializingBean
介面,在 bean
初始化過程中會呼叫其 afterPropertiesSet
方法,進而呼叫 getObject()
-> get()
-> init()
,之後再呼叫 ReferenceConfig
的 createProxy()
方法。
ReferenceConfig
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // 指定URL的情況下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// 預設情況下如果本地有服務暴露,則引用本地服務
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 使用者自己指定URL,可能是點對點直連地址,也可能是註冊中心URL
if (url != null && url.length() > 0) {
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 通過註冊中心配置拼裝URL
List<URL> us = loadRegistries(false); // 從註冊中心上獲得相應的協議url地址
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
// refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// refprotocol -> Protocol$Adaptive
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最後一個registry url
}
}
// 註冊中心地址URL不為空
if (registryURL != null) {
// 對有註冊中心的Cluster只用AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 返回的是MockClusterInvoker(FailoverClusterInvoker)
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 建立服務代理
return (T) proxyFactory.getProxy(invoker);
}
Protocol$Adpative
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
// extName -> registry
// extension -> RegistryProtocol
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter("registry", "dubbo")).removeParameter("registry");
Registry registry = this.registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return this.proxyFactory.getInvoker(registry, type, url);
} else {
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded("refer"));
String group = (String)qs.get("group");
return group == null || group.length() <= 0 || Constants.COMMA_SPLIT_PATTERN.split(group).length <= 1 && !"*".equals(group) ? this.doRefer(this.cluster, registry, type, url) : this.doRefer(this.getMergeableCluster(), registry, type, url);
}
}
RegistryProtocol.doRefer
方法存在一個引數 Cluster
,而 Cluster
是一個擴充套件點,存在加在方法級別上的 @Adaptive
註解,說明會動態生成自適應介面卡( Cluster$Adaptive
)。在 RegistryProtocol
中存在 Cluster
擴充套件點成員變數及 setter
方法,說明是一個自動注入的擴充套件點。
@SPI("failover")
public interface Cluster {
@Adaptive
<T> Invoker<T> join(Directory<T> var1) throws RpcException;
}
// RegistryProtocol
private Cluster cluster;
public void setCluster(Cluster cluster) {
this.cluster = cluster;
}
Cluster$Adpative
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
又因為 Cluster
擴充套件點實現中存在以擴充套件點作為引數的構造方法,所以會被 Wrapper
裝飾,而該裝飾器就是 MockClusterWrapper
。
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker(directory, this.cluster.join(directory));
}
}
// RegistryProtocol
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory(type, url);
directory.setRegistry(registry); // registry -> ZookeeperRegistry
directory.setProtocol(this.protocol); // protocol -> Protocol$Adaptive
URL subscribeUrl = new URL("consumer", NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
// 註冊consumer://協議地址到註冊中心
registry.register(subscribeUrl.addParameters(new String[]{"category", "consumers", "check", String.valueOf(false)}));
}
// 註冊zookeeper地址變更
directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
// 返回一個MockClusterInvoker(FailoverClusterInvoker)
return cluster.join(directory);
}
MockClusterWrapper
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {