1. 程式人生 > >Dubbo服務合買平臺搭建出售釋出之服務暴露&心跳機制&服務註冊

Dubbo服務合買平臺搭建出售釋出之服務暴露&心跳機制&服務註冊

Dubbo服務釋出

Dubbo合買平臺搭建出售 dsluntan.com Q:3393756370 VX:17061863513服務釋出影響流程的主要包括三個部分,依次是:

  1. 服務暴露
  2. 心跳
  3. 服務註冊

服務暴露是對外提供服務及暴露埠,以便消費端可以正常調通服務。心跳機制保證伺服器端及客戶端正常長連線的保持,服務註冊是向註冊中心註冊服務暴露服務的過程。

Dubbo服務暴露

此處只記錄主要程式碼部分以便能快速定位到主要的核心程式碼:

ServiceConfig.java中程式碼

if (registryURLs != null && registryURLs.size() > 0
        && url.getParameter("register", true)) { // 迴圈祖冊中心 URL 陣列 registryURLs for (URL registryURL : registryURLs) { // "dynamic" :服務是否動態註冊,如果設為false,註冊後將顯示後disable狀態,需人工啟用,並且服務提供者停止時,也不會自動取消冊,需人工禁用。 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); // 獲得監控中心 URL URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // 將監控中心的 URL 作為 "monitor" 引數新增到服務提供者的 URL 中,並且需要編碼。通過這樣的方式,服務提供者的 URL 中,包含了監控中心的配置。 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // 使用 ProxyFactory 建立 Invoker 物件 // 呼叫 URL#addParameterAndEncoded(key, value) 方法,將服務體用這的 URL 作為 "export" 引數新增到註冊中心的 URL 中。通過這樣的方式,註冊中心的 URL 中,包含了服務提供者的配置。 // 建立 Invoker 物件。該 Invoker 物件,執行 #invoke(invocation) 方法時,內部會呼叫 Service 物件( ref )對應的呼叫方法。 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // 使用 Protocol 暴露 Invoker 物件 /** * Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol * => * Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol */ Exporter<?> exporter = protocol.export(invoker); // 新增到 `exporters` exporters.add(exporter); } }

迴圈註冊中心,對每個註冊中心都執行程式碼塊中的執行過程

1.如果url中沒有dynamic 引數,則從registerUrl中取值,並賦予url
dynamic是服務動態註冊的標識,預設為true,如果設定為false,則服務註冊後顯示disable狀態,需人工啟動

2.載入註冊中心對應的監控中心配置

3.如果註冊中心不為空則設定url的 monitor引數

4.Invoker proxyFactory.getInvoker  proxyFactory 預設為JavassistProxyFactory物件,這段程式碼為建立 ref 服務物件的代理物件。
proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 獲取ref的代理物件並在registryURL 中新增export屬性,代理物件中屬性引數如下

5.protocol.export(invoker) 為暴露服務的核心實現部分,協議的呼叫鏈如下:

 /**
 * Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
 * =>
 * Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
 */

其中DubboProtocol 實現了服務暴露及心跳檢測功能 
RegistryProtocol 呼叫了DubboProtocol及註冊服務

接下來經過兩個擴充套件類(包裝器) ProtocolFilterWrapper和ProtocolListenerWrapper 進入RegistryProtocol 核心程式碼如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 暴露服務 //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); // 獲得服務提供者 URL final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 訂閱override資料 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為快取的key,導致訂閱資訊覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter例項 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }


 

/**
 * 暴露服務。
 *
 * 此處的 Local 指的是,本地啟動服務,但是不包括向註冊中心註冊服務的意思。
 * @param originInvoker
 * @param <T>
 * @return
 */
@SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { // 獲得在 `bounds` 中的快取 Key //dubbo://192.168.20.218:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false& // interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=1760&side=provider&timestamp=1530150456618 String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); // 未暴露過,進行暴露服務 if (exporter == null) { // InvokerDelegete 實現 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 類,主要增加了 #getInvoker() 方法,獲得真實的,非 InvokerDelegete 的 Invoker 物件。 // 因為,可能會存在 InvokerDelegete.invoker 也是 InvokerDelegete 型別的情況。 getProviderUrl 同上 key = getCacheKey final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 暴露服務,建立 Exporter 物件 Exporter<T> export = (Exporter<T>) protocol.export(invokerDelegete); // 使用 建立的Exporter物件 + originInvoker ,建立 ExporterChangeableWrapper 物件 exporter = new ExporterChangeableWrapper<T>(export, originInvoker); bounds.put(key, exporter); } } } return exporter; }

1.代用同步鎖+double-check的方式來保證同樣的服務不重複暴露。

2.new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
InvokerDelegete 實現 com.alibaba.dubbo.rpc.protocol.InvokerWrapper(invoke) 類,主要增加了 #getInvoker() 方法,獲得真實的,非 InvokerDelegete 的 Invoker 物件。

3.呼叫protocol.export介面 經過ProtocolFilterWrapper.invoker方法 創過濾器鏈再暴露服務:

protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));

/**
 * 構建過濾器鏈
 * @param invoker injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&default.accepts=1000&default.threadpool=fixed&default.threads=100&default.timeout=5000&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=uce&pid=9932&side=provider&timestamp=1527930395583
 * @param key service.filter 該引數用於獲得 ServiceConfig 或 ReferenceConfig 配置的自定義過濾器
 *            以 ServiceConfig 舉例子,例如 url = injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.3.17&bind.port=20880&default.delay=-1&default.retries=0&default.service.filter=demo&delay=-1&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=81844&qos.port=22222&service.filter=demo&side=provider&timestamp=1520682156043 中,
 *            service.filter=demo,這是筆者配置自定義的 DemoFilter 過濾器。
 *            <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" filter="demo" />
 * @param group provider  屬性,分組
 *              在暴露服務時,group = provider 。
 *              在引用服務時,group = consumer 。
 * @param <T> * @return */ private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); /* EchoFilter ClassLoaderFilter GenericFilter ContextFilter TraceFilter TimeoutFilter MonitorFilter ExceptionFilter DemoFilter 【自定義】*/ //倒序迴圈 Filter ,建立帶 Filter 鏈的 Invoker 物件。因為是通過巢狀宣告匿名類迴圈呼叫的方式,所以要倒序。可以手工模擬下這個過程。通過這樣的方式,實際過濾的順序,還是我們上面看到的正序 if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
獲取Active的屬於指定組的過過濾器列表
參考文章:https://my.oschina.net/LucasZhu/blog/1835048

接下來執行DubboProrocol進行服務暴露的過程。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); // 建立 DubboExporter 物件,並新增到 `exporterMap` 。 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 啟動伺服器 openServer(url); return exporter; }

1.獲取invoker的 URL資訊
2.獲取key資訊 為URL中interface與暴露埠的拼裝字串:com.alibaba.dubbo.demo.DemoService:20880
3.建立DubboExporter物件 並且入參為exporterMap
4.將exporter物件新增到exporterMap中
 

 /**
  * 啟動伺服器
  *
  * @param url URL
  */
private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一個只有server可以呼叫的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支援reset,配合override功能使用 server.reset(url); } } }

呼叫createServer()方法 並存入DubboProtocol的serverMap中
 

private ExchangeServer createServer(URL url) {
    //預設開啟server關閉時傳送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //預設開啟heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 校驗 Server 的 Dubbo SPI 拓展是否存在 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 設定codec為 `"Dubbo"` url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }

1.預設開啟server 關閉時傳送readonly事件:channel.readonly.sent : true
2.預設開啟 heartbeat 
3.獲取服務暴露的 server 傳輸 , 預設為netty
4.設定編碼器為Dubbo也就是 DubboCountCodec
5.Exchangers#bind(url, requestHandler) 啟動伺服器,requestHandler結構如下

具體實現程式碼如下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要處理高版本呼叫低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } };

Exchangeers.bind(URL url, ExchangeHandler handler)

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }

介面作用是設定exchanger params為header 並且獲取Exchanger.class的header擴充套件介面HeaderExchanger, 並呼叫bind方法:

public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }

先將 DubboProtocol入參 傳過來的ExchangeHandler物件ExchangeHandlerAdapter() 進行包裝組成handler鏈:最後返回ChannelHandler物件,接下來呼叫:Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
Server Transporters.bind(URL url, ChannelHandler... handlers)
Transpoter$Adaptive.bind()

資料透傳 NettyTransporter.java
Server NettyTransporter.bind(URL url, ChannelHandler listener)

public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }

作用是:

返回一個NettyServer例項:

 

public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }

ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)) 只用是生成獲取ThreadName的名稱 為URL新增threadname的param
ChannelHandlers.wrap(ChannelHandler handler, URL url)  程式碼如下:

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers(); protected ChannelHandlers() { } public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected static ChannelHandlers getInstance() { return INSTANCE; } static void setTestingChannelHandlers(ChannelHandlers instance) { INSTANCE = instance; } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } }

ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url):

獲取到AllDispatcher分發器進行透傳:

public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }

結構如圖所示:

呼叫WrappedChannelHandler的構造方法:

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }

這段程式碼的功能為:

1.將 之前頭創的DecoderHandler物件再進包裝 包裝為AllChannelHandler
2.生成執行緒池物件Executor物件
3.獲取預設的DataStore物件,並將執行緒池物件放入DataStore 中 key為 : java.util.concurrent.ExecutorService 字串和服務暴露的埠 值為執行緒池物件

return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
接下來將返回的AllChannelHandler物件用HeartbeatHandler 和 MultiMessageHandler 進行包裝處理並返回ChannelHandler.wrap() 的上一端。

NettyTransporter.bind(URL url, ChannelHandler listener) -> new NettyServer(URL url, ChannelHandler handler)
->  super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
接下來是建立NettyServer物件的最後一步:

NettyServer ==>
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } AbstractServer==> public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } AbstractEndpoint ==> public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); this.codec = getChannelCodec(url); this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } AbstractPeer==> public AbstractPeer(URL url, ChannelHandler handler) { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.url = url; this.handler = handler; }

呼叫棧如上所示:
因為之前設定了codec為dubbo 所以返回DubboCountCodec例項
獲取超時時間timeout ,和連結的超時時間connectTimeout
localAddress為本地IP:PORT  port為服務暴露的埠
host 為0.0.0.0
bindAddress為 host:port port為服務暴露的埠
this.accept 為預設獲取最大連線數
idleTimeout為 url中 idle.timeout
核心程式碼:doOpen()

@Override
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); } 

1.首先進行Netty的日誌配置
接下來先生成 NettyCodecAdapter 入參為之前生成的codec , URL資訊(主要用到buffer屬性配置Netty緩衝區)及 this (Handler) 物件
接下來就是設定Netty的Encoder Decoder 來進行資料的編碼與解碼 其會呼叫 this的handler鏈來進行資料處理。Dubbo2.5.6採用的是Netty3來進行通訊的,此處就不進行贅述。

AbstractServer 接下來獲取到從DataStore物件中獲取之前快取的執行緒池 ,設定 NettyServer的 executor屬性。

自此,Dubbo服務暴露的程式碼解析完畢,NettyServer的類結構圖如下:

心跳服務

Dubbo provider的心跳服務是 HeaderExchanger bind程式碼執行的最後一步:引數是上面生成的Server物件 (NettyServer)。

public HeaderExchangeServer(Server server) {
    if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeatbeatTimer(); }

1.初始化 server資訊
2.獲取server URL中heartbeat資訊 及心跳超時資訊,預設為heartbeat的三倍
3.執行心跳程式碼 startHeatbeatTimer()

private void startHeatbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }

1.停止定時任務——首先停止定時器中所有任務,置空 beatbeatTimer;
2.重新設定定時器 , 迴圈檢測

接下來在DubboProtocol的openServer(URL) 方法中將建立的ExchangeServer物件放入 DubboProtocol的 serverMap 集合物件中 
key為服務的ip:port 如 192.168.20.218:20880
value為之前建立的ExchangeServer物件

DubboProtocol export方法到此執行完畢,最終返回的是 DubboExporter物件包裝了入參的invoker物件,serviceKey資訊,及服務暴露的 exporterMap物件。

服務註冊

我們接著來看RegistryProtocol 接下來的執行程式碼:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 暴露服務 //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider 新增定時任務 ping request response final Registry registry = getRegistry(originInvoker); // 獲得服務提供者 URL final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 訂閱override資料 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為快取的key,導致訂閱資訊覆蓋。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保證每次export都返回一個新的exporter例項 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }

1.ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) 為暴露服務的執行過程,上面流程已經走過。
返回的資料格式如下:

2.根據originInvoker中註冊中心資訊獲取對應的Registry物件,因為這裡是zookeeper協議,所以為ZookeeperRegistry物件
3.從註冊中心的URL中獲得 export 引數對應的值,即服務提供者的URL.
4.registry.register(registedProviderUrl); 用之前建立的註冊中心物件註冊服務
5.
 

// TODO 

 

上面提到 Registry getRegistry(final Invoker<?> originInvoker) 是根據invoker的地址獲取registry例項程式碼如下:

private Registry getRegistry(final Invoker<?> originInvoker) { // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.20.218%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26default.accepts%3D1000%26default.threadpool%3Dfixed%26default.threads%3D100%26default.timeout%3D5000%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26owner%3Duce%26pid%3D12028%26side%3Dprovider%26timestamp%3D1531912729429&owner=uce&pid=12028&registry=zookeeper&timestamp=1531912729343 URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.20.218%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26default.accepts%3D1000%26default.threadpool%3Dfixed%26default.threads%3D100%26default.timeout%3D5000%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26owner%3Duce%26pid%3D12028%26side%3Dprovider%26timestamp%3D1531912729429&owner=uce&pid=12028&timestamp=1531912729343 return registryFactory.getRegistry(registryUrl); }

上面程式碼的意思是:
1.獲取originalInvoker中的URL資訊 (註冊中心的配置資訊)
2.將URL中資訊中Param中registry引數獲取到,並替換URL中的protocol屬性,並刪除Param中的registry資訊,上面程式碼中的註釋為執行前和執行後的的結果。
3.獲取protocol 為 zookeeper對應的RegistryFactory介面的擴充套件物件 ZookeeperRegistryFactory 並執行getRegistry 方法:

ZookeeperRegistryFactory的繼承結構和對應類中屬性如下圖所示:
其中REGISTRIES = new ConcurrentHashMap<String, Registry>(); 代表註冊中心的配置,其中可以有多個註冊中心配置

AbstractRegistryFactory.getRegistry執行程式碼如下:

public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();   // zookeeper://192.168.1.157:2181/com.alibaba.dubbo.registry.RegistryService // 鎖定註冊中心獲取過程,保證註冊中心單一例項 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // 釋放鎖 LOCK.unlock(); } }

1.設定Path屬性,新增interface引數資訊,及移除export 和 refer 引數資訊。執行結果如下:
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&owner=uce&pid=12028&timestamp=1531912729343
2.獲取url對應的serviceString資訊:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService,由於我使用的是本地的zookeeper 所以IP為 127.0.0.1
3.順序地建立註冊中心:Registry ZookeeperRegistryFactory.createRegistry(URL url);

public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter); } // 構造ZookeeperRegistry的呼叫鏈如下所示 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 檢測並連線註冊中心 try { retry(); } catch (Throwable t) { // 防禦性容錯 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } public AbstractRegistry(URL url) { setUrl(url); // 啟動檔案儲存定時器 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; loadProperties(); notify(url.getBackupUrls()); }

ZookeeperRegistry 的類繼承結構圖如圖所示:


ZooKeeperRegistry.FailbackRegistry.AbstractRegistry中
1.setUrl設定url屬性資訊
2.是否啟用檔案的非同步儲存
3.註冊中心對應的本地檔案儲存的位置資訊:如C:\Users\Administrator/.dubbo/dubbo-registry-127.0.0.1.cache
4.給file賦值 並且載入檔案資訊到properties屬性中
5.notify(url.getBackupUrls) 這段程式碼不知道什麼意思。

ZooKeeperRegistry.FailbackRegistry中
1.獲取定時任務的時間間隔。
2.開啟定時任務定時檢測失敗的註冊,並重新註冊。

ZooKeeperRegistry 中
1.獲取註冊中心的group引數 ,預設為/dubbo , 並未root賦予group值
2.zkClient = zookeeperTransporter.connect(url); 連結zookeeper資訊並新增狀態監聽事件,具體再更文詳述吧,程式碼如下:

public ZkclientZookeeperClient(URL url) {
    super(url); client = new ZkClient(url.getBackupAddress()); client.subscribeStateChanges(new