Dubbo/Dubbox的dubbo協議實現(一)-服務端啟動
之前已經分析的dubbo的服務的發現和註冊,這裡先看一下dubbo協議是如何實現的,之前已經知道了,呼叫DubboProtocol類的export來暴露服務的,協議實現比較複雜,這裡只關係主體實現即排除一些特性功能的處理程式碼
本章主要處理服務端對應的暴露流程,繼續回到···com.alibaba.dubbo.config.ServiceConfig···的doExportUrlsFor1Protocol方法(487行附近)
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded (Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
proxyFactory 是一個全域性變數(自適應擴充套件點),之前已經反覆複習不多說明了
proxyFactory.getInvoker實現如下,該方法內部會呼叫一個封裝類com.alibaba.dubbo.common.bytecode.Wrapper,該類封裝了例項的方法呼叫。proxyFactory.getInvoker 會返回一個
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper類不能正確處理帶$的類名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url ) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
這裡com.alibaba.dubbo.common.bytecode.Wrapper
類不做過多說明,其會生成有一個名為invokeMethod封裝方法,invokeMethod 方法根據傳入例項和方法名,呼叫例項對應的方法,invokeMethod方法簽名如下:
/**
*
* @param o 介面類例項
* @param n 方法名
* @param p 引數型別
* @param v 引數值
* @return
* @throws java.lang.reflect.InvocationTargetException
*/
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException
所以到這裡可以發現,對於服務的提供者來說通過com.alibaba.dubbo.rpc.Invoker的 protected Object doInvoke(T proxy, String methodName,Class
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
/*url此刻內容如下。
*dubbo://ip:port/服務介面?anyhost=true&application=dsp-ad&default.timeout=10000&dubbo=2.8.4&generic=false&
*interface=服務介面&logger=log4j&methods=介面方法名&organization=company&owner=weiythi&pid=5948&revision=1.6.1&serialization=kryo&side=provider×tamp=1511849953486
*/
URL url = invoker.getUrl();
// export service.
// serviceKey為 服務介面類名:埠號
String key = serviceKey(url);
//DubboExporter 實現介面 com.alibaba.dubbo.rpc.Exporter ,會在抽象實現com.alibaba.dubbo.rpc.protocol.AbstractExporter 的基礎上維護exporterMap ,key
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//省略......export an stub service for dispaching event......//
//開啟服務
openServer(url);
// modified by lishen 序列化優化相關,暫不去了解
optimizeSerialization(url);
return exporter;
}
這裡先找到最關心的點“openServer(url);”,其是dubbo協議服務暴露的入口,其實現如下:
private void openServer(URL url) {
// find server.
// 這裡是ip:port
String key = url.getAddress();
//client 也可以暴露一個只有server可以呼叫的服務。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {//true
//按照ip:port為key 找到資訊交換層的服務端
ExchangeServer server = serverMap.get(key);
if (server == null) {//如果木有建立過server端,則建立並將服務端快取
serverMap.put(key, createServer(url));
} else {
//server支援reset,配合override功能使用
server.reset(url); //這個暫時木有看懂,等回過頭來看
}
}
}
注意這裡serverMap雖說不是一個static域,但基於擴充套件點機制的實現,貌似都是單例的,所以這裡一個服務端程序可以為多個provider提供服務,那麼createServer(url)是如何開啟服務的呢?
createServer(url)的方法實現如下, 注意該方法會通過url匯流排的server引數來決定使用dubbo的協議實現,dubbox這裡預設nio框架是netty
private ExchangeServer createServer(URL url) {
//預設開啟server關閉時傳送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//預設開啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
//獲得協議的server實現,預設netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
//注意這裡的requestHandler 是一個com.alibaba.dubbo.remoting.exchange.ExchangeHandler 的介面實現。
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
在方法中能注意到一個閃閃亮的語句,server = Exchangers.bind(url, requestHandler); 這裡傳入了一個引數requestHandler,requestHandler在此類是一個com.alibaba.dubbo.remoting.exchange.ExchangeHandler的介面實現,看了下原始碼這裡又引入了一系列暫時不知道用處的類,所以此處暫時忽略介面實現,既然其是一個handler所以接到consumer的呼叫之後必然會執行的,先放在一邊。我們接資料的時候一起看。
com.alibaba.dubbo.remoting.exchange.Exchangers 類主要有兩個功能,即伺服器的繫結和客戶端的連線功能
Exchangers.bind(url, requestHandler); 內部又涉及一系列的擴充套件點,不貼程式碼出來了。簡單介紹一下, 這裡肯定實現的功能就是伺服器的綁定了
1.bind方法會根據URL匯流排的exchanger引數獲得一個com.alibaba.dubbo.remoting.exchange.Exchanger 擴充套件點例項,本例例項為(com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger),嗯 目前已知的擴充套件點實現也就這一個。
2.Exchanger?在一個框架中我們通常把負責資料交換和網路通訊的元件叫做Exchanger ,接下來呼叫 ExchangeServer bind(URL url, ExchangeHandler handler) 方法,注意這裡又是一個自適應擴充套件點。
接下來進入到Exchanger僅有的一個實現com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
可見在該方法中,建立了一個com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeServer
物件,當然根據方法定義該類肯定實現介面com.alibaba.dubbo.remoting.exchange.ExchangeServer
這裡先暫時不關心該介面行為,因為建立物件前,先對handler進行了兩層封裝,然後使用com.alibaba.dubbo.remoting.Transporters
的bind方法,進行繫結。
對handler的封裝可能會絕對handler呼叫前或呼叫後的行為,設定決定了handler的呼叫方式,那麼先去確定com.alibaba.dubbo.remoting.transport.DecodeHandler
的運作情況。
額,點開看了一下貌似也挺複雜,因為這個類一頓繼承啊~,找一下自己關心的兩個東東,分別是,我家的HeaderExchangeHandler在哪裡?有木有能讓我接受到訊息呼叫到的方法。
然後發現這兩個重點是這樣的
public DecodeHandler(ChannelHandler handler) {
super(handler); //父類構造器
}
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request)message).getData());
}
if (message instanceof Response) {
decode( ((Response)message).getResult());
}
//呼叫received方法,既然我們的類已經被封裝了兩層,那麼這裡應該呼叫的是HeaderExchangeHandler
handler.received(channel, message);
}
private void decode(Object message) { //這個方法沒有任何handler相關處理,暫不去關心。
if (message != null && message instanceof Decodeable) {
try {
((Decodeable)message).decode();
if (log.isDebugEnabled()) {
log.debug(new StringBuilder(32).append("Decode decodeable message ")
.append(message.getClass().getName()).toString());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn(
new StringBuilder(32)
.append("Call Decodeable.decode failed: ")
.append(e.getMessage()).toString(),
e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
那麼再去看看 HeaderExchangeHandler 是個啥
還有既然會呼叫received方法,主要去看看received的實現。點開一看略複雜、、、、好多東東,看到這裡還不能理解,所以又去敲Transporters.bind的門,因為已經先行忽略了handler這個,這裡只關心服務是怎麼被open的
程式碼裡又耍流氓了,又是自適應擴充套件點,但是我已不怕,開啟看配置檔案直接鎖定實現com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
然後去看bind方法(等等索性直接看了)
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport.netty;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Client;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Server;
import com.alibaba.dubbo.remoting.Transporter;
/**
* @author ding.lid
*/
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
這個很明顯麼,直接new了一個com.alibaba.dubbo.remoting.transport.netty.NettyServer
出來,那麼問題簡單了,直接去看建構函式
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
呼叫了父類的構造器
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 哭 handler又甩給上層了
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String host = url.getParameter(Constants.ANYHOST_KEY, false)
|| NetUtils.isInvalidLocalHost(getUrl().getHost())
? NetUtils.ANYHOST : getUrl().getHost();
bindAddress = new InetSocketAddress(host, getUrl().getPort());
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//好吧,偷偷的呼叫了doOpen 害我好找
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
主要關注兩個事,就是我們的handler哪去了,後面可能還要用呢,還有服務在哪開的?
首先看那第一行,呼叫父類的構造器,一直跟著找,發現handler在com.alibaba.dubbo.remoting.transport.AbstractPeer
中維護了,然後AbstractPeer又是一個ChannelHandler ,好吧這個關係很混亂。先記著這個關鍵點,上一次看見類似風格是在RegistryDictory中
既然呼叫了doOpen,這個就不用想了,方法的實現就在NettyServer裡啊,但是netty之前沒玩過,所以只能看個熱鬧
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
// 這裡的NettyHandler 封裝了一個this? 這個this是什麼?
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
因為35太難了,我學不會,而且server開啟之後只要能一直提供服務就OK,我只能關注另一個點就是接到訊息後怎麼處理的。所以這裡跟訊息有關的就一個nettyHandler ,其初始化是通過這樣的語句完成的,這裡傳入了一個this,我們要知道這個this是什麼。
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
@Sharable
public class NettyHandler extends SimpleChannelHandler {
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
private final URL url;
private final ChannelHandler handler;
//.......//
public NettyHandler(URL url, ChannelHandler handler){
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
}
好吧,很巧妙的方式,因為NettyServer他是一個ChannelHandler啊,是一個ChannelHandler啊…啊。
迷只混亂,只能拿出他家家譜,然後默默流眼淚
這個家譜裡貌似沒有我們熟悉的朋友。
看看他的長輩有沒有人認識我們上文忽略的com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeServer
,因為是這麼個關係new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 所以去看看他的構造器
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeatbeatTimer();
}
引數正好是NettyServer所實現的介面com.alibaba.dubbo.remoting.Server
,到這裡我們就返回一個ExchangeServer了,不過好像目前為止並沒有有什麼實質性的動作,回到createServer
好吧,到這一個服務就建立完了。總結一下開啟服務這一部的呼叫關係(引用大佬的)
openServer netty server開啟偵聽服務,並快取服務。
dubbo -> export() -> openServer() -> createServer() -> Exchangers.bind()(HeaderExchanger) -> NettyTransporter -> NettyServer.open()(編碼解碼採用exchange)
netty ChannelFactory(boss worker) ChannelPipeline(責任鏈模式) ChannelHandler(處理器) — 反應器模式