聊聊dubbo的WrappedChannelHandler
阿新 • • 發佈:2019-12-31
序
本文主要研究一下dubbo的WrappedChannelHandler
WrappedChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler" ,true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler,URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey,Integer.toString(url.getPort()),executor);
}
public void close () {
try {
if (executor != null) {
executor.shutdown();
}
} catch (Throwable t) {
logger.warn("fail to destroy thread pool of server: " + t.getMessage(),t);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}
@Override
public void sent(Channel channel,Object message) throws RemotingException {
handler.sent(channel,message);
}
@Override
public void received(Channel channel,Object message) throws RemotingException {
handler.received(channel,message);
}
@Override
public void caught(Channel channel,Throwable exception) throws RemotingException {
handler.caught(channel,exception);
}
public ExecutorService getExecutor() {
return executor;
}
@Override
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
} else {
return handler;
}
}
public URL getUrl() {
return url;
}
public ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
}
複製程式碼
- WrappedChannelHandler的構造根據ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)獲取ExecutorService,然後放到dataStore中
ExecutionChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler,URL url) {
super(handler,url);
}
@Override
public void received(Channel channel,Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
if (message instanceof Request) {
try {
executor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message));
} catch (Throwable t) {
// FIXME: when the thread pool is full,SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
// this scenario from happening,but a better solution should be considered later.
if (t instanceof RejectedExecutionException) {
Request request = (Request) message;
if (request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") thread pool is exhausted,detail msg:" + t.getMessage();
Response response = new Response(request.getId(),request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message,channel,getClass() + " error when process received event.",t);
}
} else {
handler.received(channel,message);
}
}
}
複製程式碼
- ExecutionChannelHandler繼承了WrappedChannelHandler,其received會建立ChannelEventRunnable,然後放到executor去執行
ChannelEventRunnable
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java
public class ChannelEventRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel,ChannelHandler handler,ChannelState state) {
this(channel,state,null);
}
public ChannelEventRunnable(Channel channel,ChannelState state,Object message) {
this(channel,message,Throwable t) {
this(channel,null,t);
}
public ChannelEventRunnable(Channel channel,Object message,Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel,message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error,channel is " + channel
+ ",message is " + message,e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error,channel is " + channel,e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error,e);
}
break;
case SENT:
try {
handler.sent(channel,message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error,channel is " + channel
+ ",e);
}
break;
case CAUGHT:
try {
handler.caught(channel,exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error,message is: " + message + ",exception is " + exception,e);
}
break;
default:
logger.warn("unknown state: " + state + ",message is " + message);
}
}
}
/**
* ChannelState
*
*
*/
public enum ChannelState {
/**
* CONNECTED
*/
CONNECTED,/**
* DISCONNECTED
*/
DISCONNECTED,/**
* SENT
*/
SENT,/**
* RECEIVED
*/
RECEIVED,/**
* CAUGHT
*/
CAUGHT
}
}
複製程式碼
- ChannelEventRunnable實現了Runnable介面,其run方法根據不同的ChannelState做不同處理
小結
- WrappedChannelHandler的構造根據ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)獲取ExecutorService,然後放到dataStore中
- ExecutionChannelHandler繼承了WrappedChannelHandler,其received會建立ChannelEventRunnable,然後放到executor去執行
- ChannelEventRunnable實現了Runnable介面,其run方法根據不同的ChannelState做不同處理