dubbo的服務釋出過程
dubbo釋出服務的流程
1、具體的服務轉為invoker: ServiceConfig類通過ProxyFactory類的getInvoker方法,將服務提供類ref生成invoker。
2、Invoker轉換成Exporter:開啟通訊埠,接聽來自客戶端的申請。
具體解析
1、當Spring容器例項化bean完成,ServiceBean會執行onApplicationEvent方法,該方法呼叫ServiceConfig的export方法。
2、ServiceConfig的父類ServiceConfig在初始化時,會率先完成protocol和proxyFactory的spi擴充套件
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
其中,protocol 是協議的擴充套件,proxyFactory 是代理擴充套件(用於生成invoker)。
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); //根據URL配置資訊獲取Protocol協議,預設是dubbo String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //根據協議名,獲取Protocol的實現 //獲得Protocol的實現過程中,會對Protocol先進行依賴注入,然後進行Wrapper包裝,最後返回被修改過的Protocol //包裝經過了ProtocolFilterWrapper,ProtocolListenerWrapper,RegistryProtocol com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } }
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
}
關鍵點
//轉為invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//invoker轉為exporter
Exporter<?> exporter = protocol.export(invoker);
ref轉為invoker 過程
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
dubbo預設情況下用javassist動態代理方式,將ref轉為invoker。
invoker的定義:
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
Invocation 包含了ref類的相關方法名,引數等。invoker可以根據這個invocation得到對應的結果值。????
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//下面這句話應該是javassist動態代理的內容。結果就是wrapper這個例項裡,有invokeMethod方法,裡面傳入類的例項,方法名等,可以得到類的例項的方法的結果
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//返回一個Invoker例項,doInvoke方法中直接返回上面wrapper的invokeMethod
//關於生成的wrapper,請看下面列出的生成的程式碼,其中invokeMethod方法中就有實現類對實際方法的呼叫
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
這裡,就將服務類例項ref轉為了invoker。
invoker轉為exporter
Exporter<?> exporter = protocol.export(invoker);
轉為exporter的過程中,主要做了兩個工作:開啟指定的埠號,監聽來自客戶端的申請;向Zookeeper等註冊中心註冊、訂閱服務;
向註冊中心註冊,訂閱服務
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 訂閱override資料
// FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為快取的key,導致訂閱資訊覆蓋。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
//OverrideListener是RegistryProtocol的內部類
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//訂閱override資料
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
final Registry registry = getRegistry(originInvoker) 是根據originInvoker的url獲取註冊中心的地址,生成註冊中心的客戶端。其中 originInvoker的url是
registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test-service&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.86.1%3A20880%2Fcn.andy.dubbo.DataService%3Fanyhost%3Dtrue%26application%3Ddubbo-test-service%26dispatcher%3Dall%26dubbo%3D2.5.3%26interface%3Dcn.andy.dubbo.DataService%26methods%3DdubboTest2%2CdubboTest%2CgetStringData%26mock%3Dtrue%26pid%3D47756%26retries%3D0%26service.filter%3DandyFilter%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D60000%26timestamp%3D1543287819642%26token%3D1234567&pid=47756®istry=zookeeper×tamp=1543287819603
[email protected]
[email protected]1af9
interface cn.andy.dubbo.DataService
registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test-service&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.86.1%3A20880%2Fcn.andy.dubbo.DataService%3Fanyhost%3Dtrue%26application%3Ddubbo-test-service%26dispatcher%3Dall%26dubbo%3D2.5.3%26interface%3Dcn.andy.dubbo.DataService%26methods%3DdubboTest2%2CdubboTest%2CgetStringData%26mock%3Dtrue%26pid%3D47756%26retries%3D0%26service.filter%3DandyFilter%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D60000%26timestamp%3D1543287819642%26token%3D1234567&pid=47756®istry=zookeeper×tamp=1543287819603
registry.register(registedProviderUrl)是將自己(url)註冊到註冊中心。registedProviderUrl的地址是:
dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=47756&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543287819642&token=1234567
而 overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl)的地址添加了category=configurators&check=false這兩項,添加了configurators(配置規則)的寫入,為後續的監聽做準備
provider://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&category=configurators&check=false&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=47756&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543287819642&token=1234567
//訂閱override資料
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
這裡是指,服務釋出之後,當我們通過監控中心或者治理中心或者直接通過程式碼向註冊中心寫入配置規則時,註冊中心會通知dubbo,重新發布添加了配置規則的這個服務。
開啟監聽服務
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker);
上面這句就是完成了invoker轉為exporter。
最終,最轉為dubboProtocol的
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
openServer(url)就是開啟url所對應的netty的伺服器端,進行監聽。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一個只有server可以呼叫的服務。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支援reset,配合override功能使用
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//預設開啟server關閉時傳送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//預設開啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
其中, server = Exchangers.bind(url, requestHandler)裡的requestHandler就包含了我們的invoker。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本呼叫低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if(logger.isInfoEnabled()){
logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
在received時間中呼叫reply方法,reply裡有invoker.invoke(inv),裡面有對方法的最終呼叫。