Dubbo原始碼解析之服務端接收訊息
阿新 • • 發佈:2018-11-05
準備
dubbo
版本:2.5.4
服務端接收訊息流程
Handler鏈路
DubboProtocol
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString());
url = url.addParameterIfAbsent("heartbeat", String.valueOf(60000));
String str = url.getParameter ("server", "netty");
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
} else {
url = url.addParameter("codec", Version. isCompatibleVersion() ? "dubbo1compatible" : "dubbo");
ExchangeServer server;
try {
// requestHandler -> new ExchangeHandlerAdapter() {}
server = Exchangers.bind(url, this.requestHandler);
} catch (RemotingException var5) {
throw new RpcException ("Fail to start server(url: " + url + ") " + var5.getMessage(), var5);
}
str = url.getParameter("client");
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}
HeaderExchanger
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// DecodeHandler(HeaderExchangeHandler(ExchangeHandlerAdapter))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
ChannelHandlers
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// extension = ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()
// extension = AllDispatcher
return new MultiMessageHandler(new HeartbeatHandler(((Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()).dispatch(handler, url)));
}
AllDispatcher
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
// AllChannelHandler
return new AllChannelHandler(handler, url);
}
}
所以,服務端完整的 handler
鏈路為:
MultiMessageHandler
-> HeartbeatHandler
-> AllChannelHandler
-> DecodeHandler
-> HeaderExchangeHandler
-> ExchangeHandlerAdapter
接收訊息流程
NettyHandler
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), this.url, this.handler);
try {
// 呼叫MultiMessageHandler處理
this.handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
MultiMessageHandler
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage)message;
for(Object obj : list) {
handler.received(channel, obj);
}
} else {
// 呼叫HeartbeatHandler處理
handler.received(channel, message);
}
}
HeartbeatHandler
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if(logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
// 呼叫AllChannelHandler處理
handler.received(channel, message);
}
AllChannelHandler
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
ChannelEventRunnable
public void run() {
switch (state) {
case CONNECTED:
try{
handler.connected(channel);
}catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try{
handler.disconnected(channel);
}catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try{
handler.sent(channel,message);
}catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is "+ message,e);
}
break;
case RECEIVED:
try{
// 呼叫DecodeHandler處理
handler.received(channel, message);
}catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is "+ message,e);
}
break;
case CAUGHT:
try{
handler.caught(channel, exception);
}catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel
+ ", message is: " + message + ", exception is " + exception,e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
DecodeHandler
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request)message).getData());
}
if (message instanceof Response) {
decode( ((Response)message).getResult());
}
// 呼叫HeaderExchangeHandler處理
handler.received(channel, message);
}
HeaderExchangeHandler
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
// 處理請求
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
// HeaderExchangeHandler
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
DubboProtocol
成員變數 requestHandler
匿名內部類實現
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {}
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本呼叫低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// invoker -> Filter(Listener(InvokerDelegete(AbstractProxyInvoker (Wrapper.invokeMethod)))
return invoker.