聊聊dubbo的AllDispatcher
阿新 • • 發佈:2019-12-31
序
本文主要研究一下dubbo的AllDispatcher
Dispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Dispatcher.java
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
/**
* dispatch the message to threadpool.
*
* @param handler
* @param url
* @return channel handler
*/
@Adaptive({Constants.DISPATCHER_KEY,"dispather","channel.handler"})
// The last two parameters are reserved for compatibility with the old configuration
ChannelHandler dispatch(ChannelHandler handler,URL url);
}
複製程式碼
- Dispatcher介面定義了dispatch方法,返回ChannelHandler
AllDispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllDispatcher.java
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler,URL url) {
return new AllChannelHandler(handler,url);
}
}
複製程式碼
- AllDispatcher實現了Dispatcher介面,其dispatch方法返回的是AllChannelHandler
AllChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler,URL url) {
super(handler,url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event",channel,getClass() + " error when process connected event .",t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel,ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event",getClass() + " error when process disconnected event .",t);
}
}
@Override
public void received(Channel channel,Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel,ChannelState.RECEIVED,message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full,refuses to call,does not return,and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted,detail msg:" + t.getMessage();
Response response = new Response(request.getId(),request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message,getClass() + " error when process received event .",t);
}
}
@Override
public void caught(Channel channel,Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel,ChannelState.CAUGHT,exception));
} catch (Throwable t) {
throw new ExecutionException("caught event",getClass() + " error when process caught event .",t);
}
}
}
複製程式碼
- AllChannelHandler繼承了WrappedChannelHandler,其connected、disconnected、received、caught均是通過父類的getExecutorService獲取執行緒池,然後執行建立的ChannelEventRunnable;received方法在捕獲到異常時RejectedExecutionException且message是Request,而且request是twoWay的時候會返回SERVER_THREADPOOL_EXHAUSTED_ERROR
小結
- Dispatcher介面定義了dispatch方法,返回ChannelHandler
- AllChannelHandler繼承了WrappedChannelHandler,其connected、disconnected、received、caught均是通過父類的getExecutorService獲取執行緒池,然後執行建立的ChannelEventRunnable
- AllChannelHandler的received方法在捕獲到異常時RejectedExecutionException且message是Request,而且request是twoWay的時候會返回SERVER_THREADPOOL_EXHAUSTED_ERROR