dubbo的服務呼叫過程
服務消費的過程:referenceConfig類的init方法呼叫Protocol的refer方法,生成invoker例項,然後把Invoker轉換為客戶端需要的介面。
2、原始碼解析
dubbo的消費端初始化在ReferenceConfig的get()方法
public synchronized T get() { if (destroyed){ throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; }
init()方法做了一系列獲取url引數的操作後,進行ref = createProxy(map);獲取代理。
在createProxy方法中,根據url是做本地引用,直接根據url的ip和埠號做直接引用,還是通過註冊中心獲取服務地址,進行引用。這裡我們直接看最後一種(這也是最常用的)
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { //預設情況下如果本地有服務暴露,則引用本地服務. isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { if (url != null && url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // 通過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最後一個registry url } } if (registryURL != null) { // 有 註冊中心協議的URL // 對有註冊中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 註冊中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && ! invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }
上面程式碼中
List us = loadRegistries(false);獲取到註冊中心的地址
[registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubboTest-test-web&dubbo=2.5.3&pid=10628®istry=zookeeper×tamp=1543316713611]
然後進入 invoker = refprotocol.refer(interfaceClass, urls.get(0));獲取invoker。
這裡refprotocol是
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
需要根據url的名稱來載入對應的spi擴充套件,找到真正的類。根據url,是register,所以會定位到com.alibaba.dubbo.registry.integration.RegistryDirectory類。找到類的refer方法:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//根據url找到註冊中心,這裡用的是zookeeper註冊中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
return doRefer(cluster, registry, type, url);
}
上面方法中,找到註冊中心後,呼叫doRefer(cluster, registry, type, url),返回invoker
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
上面程式碼中,subscribeUrl 的值是:
consumer://192.168.86.1/cn.andy.dubbo.DataService?application=dubboTest-test-web&check=false&connections=5&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&pid=10628&retries=0&revision=0.0.1-SNAPSHOT&side=consumer×tamp=1543316675743
這個url就是向註冊中心註冊的消費端的地址。其中註冊中心的地址分為永久地址和臨時地址。在這個例子中,永久地址就是cn.andy.dubbo.DataService,其下分為provider和consumer以及monitor等。其中,服務提供端的ip地址和埠號作為臨時地址放在provider下,消費端的ip地址和埠號放在consumer下。同時,消費端會對provider下的地址進行訂閱,當其有變化時,會通知消費端,這樣,消費端和服務端就對應起來了。
上面程式碼中directory可以理解為服務端invoker的list集合。
RegistryDirectory directory = new RegistryDirectory(type, url);
裡面的成員
private volatile Map<String, Invoker> urlInvokerMap,儲存了服務端的invoker集合。
direct通過訂閱註冊中心的相關路徑:
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ “,” + Constants.CONFIGURATORS_CATEGORY
+ “,” + Constants.ROUTERS_CATEGORY));
當服務端的invoker集合發生變化,或者路由,配置等發生變化時,都會通知這個director。以invoker發生變化為例(即服務端的提供者數量發生變化)。
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
裡面的this,就是RegistryDirectory,那麼檢視RegistryDirectory的notify方法:
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && configuratorUrls.size() >0 ){
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() >0 ){
List<Router> routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合併override引數
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers,根據url重新整理invoker列表
refreshInvoker(invokerUrls);
/**
* 根據invokerURL列表轉換為invoker列表。轉換規則如下:
* 1.如果url已經被轉換為invoker,則不在重新引用,直接從快取中獲取,注意如果url中任何一個引數變更也會重新引用
* 2.如果傳入的invoker列表不為空,則表示最新的invoker列表
* 3.如果傳入的invokerUrl列表是空,則表示只是下發的override規則或route規則,需要重新交叉對比,決定是否需要重新引用。
* @param invokerUrls 傳入的引數不能為null
*/
private void refreshInvoker(List<URL> invokerUrls){
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止訪問
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 關閉所有Invoker
} else {
this.forbidden = false; // 允許訪問
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//快取invokerUrls列表,便於交叉對比
}
if (invokerUrls.size() ==0 ){
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名對映Invoker列表
// state change
//如果計算錯誤,則不進行處理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
return ;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try{
destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關閉未使用的Invoker
}catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
生成directory之後,通過return cluster.join(directory)返回invoker。這裡的cluster可以是failoverCluster、failfastCluster等。在我們後面消費端進行方法呼叫時,cluster會根據規則將invoker集合返回一個invoker給消費端。
在上面程式碼中,檢視 Map<String, Invoker> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表。在這個方法中,消費端開啟netty客戶端。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if(urls == null || urls.size() == 0){
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//如果reference端配置了protocol,則只選擇匹配的protocol
if (queryProtocols != null && queryProtocols.length() >0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL引數是排序的
if (keys.contains(key)) { // 重複URL
continue;
}
keys.add(key);
// 快取key為沒有合併消費端引數的URL,不管消費端如何合併引數,如果服務端URL發生變化,則重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 快取中沒有,重新refer
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
}
if (invoker != null) { // 將新的引用放入快取
newUrlInvokerMap.put(key, invoker);
}
}else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
上面的 URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL引數是排序的
是將privoderurl和消費端的url合併,相當於有多少個provider,就有多少個key(因為消費端只有一個),其格式如下:
dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubboTest-test-web&check=false&connections=5&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=55972&retries=0&revision=0.0.1-SNAPSHOT&service.filter=andyFilter&side=consumer&timeout=60000×tamp=1543369485194&token=1234567
前面的是服務提供端的ip和埠號等,但是後面的side=consumer。相當於url是providerurl合併了消費端的引數。
然後根據這個url,生成invoker
invoker = new InvokerDelegete(protocol.refer(serviceType, url), url, providerUrl);
其中,serviceType=interface cn.andy.dubbo.DataService,url就是上面的key。
在protocol.refer(serviceType, url)中完成netty的客戶端開啟。
而requestHandler 作為一個channel,是netty的處理channel。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本呼叫低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}