1. 程式人生 > 程式設計 >聊聊dubbo的WrappedChannelHandler

聊聊dubbo的WrappedChannelHandler

本文主要研究一下dubbo的WrappedChannelHandler

WrappedChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler"
,true)); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler,URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if
(CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey,Integer.toString(url.getPort()),executor); } public void close
() { try { if (executor != null) { executor.shutdown(); } } catch (Throwable t) { logger.warn("fail to destroy thread pool of server: " + t.getMessage(),t); } } @Override public void connected(Channel channel) throws RemotingException { handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException { handler.disconnected(channel); } @Override public void sent(Channel channel,Object message) throws RemotingException { handler.sent(channel,message); } @Override public void received(Channel channel,Object message) throws RemotingException { handler.received(channel,message); } @Override public void caught(Channel channel,Throwable exception) throws RemotingException { handler.caught(channel,exception); } public ExecutorService getExecutor() { return executor; } @Override public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } } public URL getUrl() { return url; } public ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; } } 複製程式碼
  • WrappedChannelHandler的構造根據ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)獲取ExecutorService,然後放到dataStore中

ExecutionChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler,URL url) {
        super(handler,url);
    }

    @Override
    public void received(Channel channel,Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full,SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening,but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(),request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message,channel,getClass() + " error when process received event.",t);
            }
        } else {
            handler.received(channel,message);
        }
    }
}
複製程式碼
  • ExecutionChannelHandler繼承了WrappedChannelHandler,其received會建立ChannelEventRunnable,然後放到executor去執行

ChannelEventRunnable

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java

public class ChannelEventRunnable implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;

    public ChannelEventRunnable(Channel channel,ChannelHandler handler,ChannelState state) {
        this(channel,state,null);
    }

    public ChannelEventRunnable(Channel channel,ChannelState state,Object message) {
        this(channel,message,Throwable t) {
        this(channel,null,t);
    }

    public ChannelEventRunnable(Channel channel,Object message,Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }

    @Override
    public void run() {
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel,message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error,channel is " + channel
                        + ",message is " + message,e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error,channel is " + channel,e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error,e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel,message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error,channel is " + channel
                            + ",e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel,exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error,message is: " + message + ",exception is " + exception,e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ",message is " + message);
            }
        }

    }

    /**
     * ChannelState
     *
     *
     */
    public enum ChannelState {

        /**
         * CONNECTED
         */
        CONNECTED,/**
         * DISCONNECTED
         */
        DISCONNECTED,/**
         * SENT
         */
        SENT,/**
         * RECEIVED
         */
        RECEIVED,/**
         * CAUGHT
         */
        CAUGHT
    }

}
複製程式碼
  • ChannelEventRunnable實現了Runnable介面,其run方法根據不同的ChannelState做不同處理

小結

  • WrappedChannelHandler的構造根據ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)獲取ExecutorService,然後放到dataStore中
  • ExecutionChannelHandler繼承了WrappedChannelHandler,其received會建立ChannelEventRunnable,然後放到executor去執行
  • ChannelEventRunnable實現了Runnable介面,其run方法根據不同的ChannelState做不同處理

doc