Dubbo/Dubbox的服務消費(一)- 服務代理的建立
dubbo的consumer的初始過程
一個常見的consumer配置是這樣的
<dubbo:reference id="dubboDemo" interface="com.company.dsp.adcenter.protocol.dubbo.DubboDemo"
protocol="dubbo"/>
這樣spring會建立一個beanName為dubboDemo的bean,使用上是這樣的
@Resource(name = "dubboDemo")
private DubboDemo dubboDemo;
那麼問題來了why why why? dubbo是怎麼建立這樣的bean的?全程都沒有要求我們使用bean標籤,也沒有要求使用Componment和Service註解,即使使用顯示的方式宣告一個Bean,那我們也沒有相關的實現類啊。
既然我們已經擁有上面的配置項,拍腦袋一想,已知dubbo啟動時會利用spring對自定義配置項的spring.handles檔案載入對應的handler解析自定義節點,且上文書已經分析
dubbo:reference節點的配置項儲存在com.alibaba.dubbo.config.spring.ReferenceBean
中,該類的uml圖如下。
可見該類整合自com.alibaba.dubbo.config.ReferenceConfig
類,並實現了下面四個介面
org.springframework.beans.factory.FactoryBean
org.springframework .context.ApplicationContextAware
org.springframework.beans.factory.InitializingBean
org.springframework.beans.factory.DisposableBean
InitializingBean和ApplicationContextAware我們都很熟悉了,還沒詳細瞭解過FactoryBean,不過這裡有一個道友的文章可以看看《FactoryBean的實現原理與作用》
其中提到了以下兩句
OK,那麼這個時候我們getBean(“personFactory”)得到的就是Person物件而不是PersonFactoryBean物件。具體原理參考上面在IOC的應用,我們通過bean = getObjectForBeanInstance(sharedInstance, name, beanName, null)這個方法,具體呼叫到了getObject方法,所以結果很明顯。
通過上面的小案例的程式碼,我們可以看到如果一個類實現了FactoryBean介面,那麼getBean得到的不是他本身了,而是它所產生的物件,如果我們希望得到它本身,只需要加上&符號即可。至於FactoryBean的實際應用,需要大家去發現理解,後面如果有機會會繼續聊聊這個東西。
那把目光轉向:com.alibaba.dubbo.config.spring.ReferenceBean
,開啟找尋getObject方法之旅
/**
* ReferenceFactoryBean
*
* @author william.liangf
* @export
*/
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
//......
public Object getObject() throws Exception {
return get();
}
//......
}
其中get()方法 繼承自com.alibaba.dubbo.config.ReferenceConfig
public synchronized T get() {
if (destroyed){
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
可見get方法內部呼叫了一個init()方法
private void init() {
if (initialized) {
return;
}
initialized = true; //初始化標記,防止重複建立物件
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// 獲取消費者全域性配置,檢查consumer是否有配置,如果該屬性為null,則new一個ConsumerConfig物件
// 方法內部呼叫appendProperties(consumer)方法,該方法內部會拼裝一個 dubbo.tagName.屬性名的key,在配置檔案中查詢值,如果有值則呼叫屬性的setter方法,設定屬性值。
checkDefault();
// 上邊註釋已經說明appendProperties方法用途
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) { //判斷是不是泛化呼叫
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
//繼承自com.alibaba.dubbo.config.AbstractInterfaceConfig
//檢查介面類中是否存在指定的方法,如果dubbo:service->dubbo:method 沒有配置的情況下,methods為null,該方法不會執行方法校驗。
//如果有相關的配置,該方法會檢查name屬性對應的方法是否存在,不存在會拋IllegalStateException異常。
checkInterfaceAndMethods(interfaceClass, methods);
}
//用-Ddubbo.resolve.file指定對映檔案路徑,此配置優先順序高於<dubbo:reference>中的配置,1.0.15及以上版本支援
//......省略-Ddubbo.resolve.file的解析程式碼行......//
//........省略進一步初始化相關配置的程式碼行 .......//
//檢查應用配置,如果沒有配置會建立預設物件。其內部會呼叫appendProperties(AbstractConfig config) 填充屬性。
//檢查失敗會丟擲IllegalStateException異常
checkApplication();
//
checkStubAndMock(interfaceClass);
//看到Map就應當想到dubbo又要初始化一個URL物件了
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (! isGeneric()) { //非泛化呼叫
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prifix = StringUtils.getServiceKey(map);
//如果指定了介面暴露哪些方法時才有效
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prifix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
//attributes通過系統context進行儲存.
StaticContext.getSystemContext().putAll(attributes);
//建立當前服務的消費代理(十分重要)
ref = createProxy(map);
}
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//預設情況下如果本地有服務暴露,則引用本地服務.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // 使用者指定URL,指定的URL可能是對點對直連地址,也可能是註冊中心URL
//......省略點對點直連的程式碼行......//
} else { // 通過註冊中心配置拼裝URL
//該方法繼承自com.alibaba.dubbo.config.AbstractInterfaceConfig
//在http://blog.csdn.net/weiythi/article/details/78467094 有相關介紹
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) { //遍歷註冊中心地址
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
//以下的程式碼反映一個主題,即建立一個invoker例項(com.alibaba.dubbo.rpc.Invoker)
if (urls.size() == 1) {
//這裡refprotocol又是一個自適應的擴充套件點
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
//注意這裡refprotocol是一個自適應擴充套件點,根據url的protocol判斷,這裡返回的是一個ProtocolListenerWrapper包裝類例項,幷包裝com.alibaba.dubbo.registry.integration.RegistryProtocol,refer方法涉及服務發現。獨立章節說明
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最後一個registry url
}
}
if (registryURL != null) { // 有 註冊中心協議的URL
// 對有註冊中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 註冊中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 建立服務代理,
return (T) proxyFactory.getProxy(invoker);
}
注意這裡refprotocol是一個自適應擴充套件點,根據url的protocol判斷,這裡返回的是一個ProtocolListenerWrapper包裝類例項,幷包裝com.alibaba.dubbo.registry.integration.RegistryProtocol,refer方法涉及服務發現。有時間獨立章節說明
createProxy方法的最後一行return (T) proxyFactory.getProxy(invoker); 那麼這個proxyFactory又是啥?
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
好吧,他又是一個自適應的擴充套件點,看一下ProxyFactory的介面定義,結合我們對dubbo中擴充套件點機制的掌握,看一看這個proxyFactory建立了一個怎樣的代理 。com.alibaba.dubbo.rpc.ProxyFactory
類的定義如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;
/**
* ProxyFactory. (API/SPI, Singleton, ThreadSafe)
*
* @author william.liangf
*/
@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
Constants.PROXY_KEY 常量的值為proxy,在\META-INF\dubbo\internal\com.alibaba.dubbo.rpc.ProxyFactory檔案中找到擴充套件點的配置如下:
stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
其中@SPI註解的值為javassist 所以使用com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory 類作為擴充套件點的實現,但是這裡還有com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper包裝類
所以這裡會使用StubProxyFactoryWrapper例項包裝JavassistProxyFactory,呼叫getProxy(invoker)的化學反應;方法會呼叫繼承自com.alibaba.dubbo.rpc.proxy.AbstractProxyFactory 的 getProxy(invoker); 方法,其內部實現如下
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
Class<?>[] interfaces = null;
//config 一般配置下這裡會返回 null
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i ++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
}
return getProxy(invoker, interfaces);
}
可見dubbo 加進來了一個奇怪的介面 com.alibaba.dubbo.rpc.service.EchoService,這裡先暫時不管這個EchoService
看一下 getProxy(invoker, interfaces);的實現
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
com.alibaba.dubbo.common.bytecode.Proxy 是一個生成代理物件的工具類
1. 遍歷所有入參介面,以;分割連線起來, 以它為key以map為快取查詢如果有,說明代理物件已建立返回
2. 利用AtomicLong物件自增獲取一個long陣列來作為生產類的字尾,防止衝突
3. 遍歷介面獲取所有定義的方法,加入到一個集合Set worked中 ,用來判重,
Proxy.getProxy(interfaces),會返回一個代理物件,該方法內部操作複雜,會返回一個如下所示的代理類
package com.alibaba.dubbo.common.bytecode;
import java.lang.reflect.InvocationHandler;
public class Proxy0 extends com.alibaba.dubbo.common.bytecode.Proxy {
@Override
public Object newInstance(InvocationHandler h) {
return new proxy0(h);
}
}
class proxy0 implements 服務介面,com.alibaba.dubbo.rpc.service.EchoService{
/**
* 被代理的方法
*/
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
/**
* 建構函式
*
* @param handler
*/
public proxy0(java.lang.reflect.InvocationHandler handler) {
this.handler = handler;
}
/**
* 被代理方法(有返回值,有引數)
*
* @return
*/
com.company.project.common.dto.Result methodName0(com.company.project.module.protocol.request.AdGroupRequest adGroupRequest) throws Throwable {
//args陣列長度為引數列表長度
Object[] args = new Object[1];
args[0] = adGroupRequest;
Object ret = handler.invoke(this, methods[0], args);
return (com.ksyun.dsp.common.dto.Result) ret;
}
/**
* 被代理方法(無返回值,有引數)
*
* @return
*/
void methodName1(com.company.project.module.protocol.request.AdGroupRequest adGroupRequest) throws Throwable {
Object[] args = new Object[1];
args[0] = adGroupRequest;
Object ret = handler.invoke(this, methods[1], args);
}
/**
* 被代理方法(無返回值,無引數)
*
* @return
*/
void methodName2() throws Throwable {
Object[] args = new Object[0];
Object ret = handler.invoke(this, methods[2], args);
}
}
從以上示例程式碼可見,剩下的操作都是通過
handler.invoke(this, method, args); 這種方式來實現代理方法的呼叫的
dubbo內部對InvocationHandler的實現如下,可見其還是實現java.lang.reflect.InvocationHandler這個介面的,但是以dubbo的代理生成方式來說,
好像沒什麼必要實現InvocationHandler介面,這裡可能是為了維護動態代理的語義吧
/**
* InvokerHandler
*
* @author william.liangf
*/
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler){
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
最後一行 return invoker.invoke(new RpcInvocation(method, args)).recreate();,為便於理解拆分為以下兩行
Result rpcResult = invoker.invoke(new RpcInvocation(method, args));
return rpcResult.recreate();
其中RpcInvocation如圖
總結:
1.dubbo通過實現org.springframework.beans.factory.FactoryBean介面的方式實現對服務的代理
2.Dubbo將所有的服務都實現了一個EchoService(回聲測試用於檢測服務是否可用,回聲測試按照正常請求流程執行,能夠測試整個呼叫是否暢通,可用於監控。所有服務自動實現EchoService介面,只需要將任意服務引用強制轉換為EchoService,即可使用。)
3.服務消費最終會呼叫com.alibaba.dubbo.rpc.Invoker.invoke(new RpcInvocation(method, args)); 方法