1. 程式人生 > 程式設計 >聊聊dubbo的ConnectionOrderedDispatcher

聊聊dubbo的ConnectionOrderedDispatcher

本文主要研究一下dubbo的ConnectionOrderedDispatcher

ConnectionOrderedDispatcher

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedDispatcher.java

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection"
; @Override public ChannelHandler dispatch(ChannelHandler handler,URL url) { return new ConnectionOrderedChannelHandler(handler,url); } } 複製程式碼
  • ConnectionOrderedDispatcher實現了Dispatcher介面,其dispatch方法返回的是ConnectionOrderedChannelHandler

ConnectionOrderedChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler,URL url) {
        super(handler,url);
        String threadName = url.getParameter(THREAD_NAME_KEY,DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY,Integer.MAX_VALUE)),new NamedThreadFactory(threadName,true
),new AbortPolicyWithReport(threadName,url) ); // FIXME There's no place to release connectionExecutor! queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE,DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event",channel,getClass() + " error when process connected event .",t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel,ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event",getClass() + " error when process disconnected event .",t); } } @Override public void received(Channel channel,Object message) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel,ChannelState.RECEIVED,message)); } catch (Throwable t) { //fix,reject exception can not be sent to consumer because thread pool is full,resulting in consumers waiting till timeout. if (message instanceof Request && t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted,detail msg:" + t.getMessage(); Response response = new Response(request.getId(),request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message,getClass() + " error when process received event .",t); } } @Override public void caught(Channel channel,Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel,ChannelState.CAUGHT,exception)); } catch (Throwable t) { throw new ExecutionException("caught event",getClass() + " error when process caught event .",t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); } } } 複製程式碼
  • ConnectionOrderedChannelHandler繼承了WrappedChannelHandler,其構造器建立了corePoolSize及maximumPoolSize均為1,queue為LinkedBlockingQueue的connectionExecutor
  • 其connected、disconnected方法均是使用connectionExecutor來執行新建立的ChannelEventRunnable;這兩個方法均會先執行checkQueueLength來判斷queue大小是否大於queuewarninglimit,大於的話則列印warn日誌
  • 其received、caught均是通過父類的getExecutorService獲取執行緒池,然後執行建立的ChannelEventRunnable;received方法在捕獲到異常時RejectedExecutionException且message是Request,而且request是twoWay的時候會返回SERVER_THREADPOOL_EXHAUSTED_ERROR

ConnectChannelHandlerTest

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java

public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest {

    @BeforeEach
    public void setUp() throws Exception {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true),url);
    }

    @Test
    public void test_Connect_Blocked() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false),url);
        ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler,"connectionExecutor",1);
        Assertions.assertEquals(1,executor.getMaximumPoolSize());

        int runs = 20;
        int taskCount = runs * 2;
        for (int i = 0; i < runs; i++) {
            handler.connected(new MockedChannel());
            handler.disconnected(new MockedChannel());
            Assertions.assertTrue(executor.getActiveCount() <= 1,executor.getActiveCount() + " must <=1");
        }
        //queue.size 
        Assertions.assertEquals(taskCount - 1,executor.getQueue().size());

        for (int i = 0; i < taskCount; i++) {
            if (executor.getCompletedTaskCount() < taskCount) {
                sleep(100);
            }
        }
        Assertions.assertEquals(taskCount,executor.getCompletedTaskCount());
    }

    @Test //biz error should not throw and affect biz thread.
    public void test_Connect_Biz_Error() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true),url);
        handler.connected(new MockedChannel());
    }

    @Test //biz error should not throw and affect biz thread.
    public void test_Disconnect_Biz_Error() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true),url);
        handler.disconnected(new MockedChannel());
    }

    @Test
    public void test_Connect_Execute_Error() throws RemotingException {
        Assertions.assertThrows(ExecutionException.class,() -> {
            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false),url);
            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler,1);
            executor.shutdown();
            handler.connected(new MockedChannel());
        });
    }

    @Test
    public void test_Disconnect_Execute_Error() throws RemotingException {
        Assertions.assertThrows(ExecutionException.class,1);
            executor.shutdown();
            handler.disconnected(new MockedChannel());
        });
    }

    //throw  ChannelEventRunnable.runtimeExeception(int logger) not in execute exception
    @Test//(expected = RemotingException.class)
    public void test_MessageReceived_Biz_Error() throws RemotingException {
        handler.received(new MockedChannel(),"");
    }

    //throw  ChannelEventRunnable.runtimeExeception(int logger) not in execute exception
    @Test
    public void test_Caught_Biz_Error() throws RemotingException {
        handler.caught(new MockedChannel(),new BizException());
    }

    @Test
    public void test_Received_InvokeInExecuter() throws RemotingException {
        Assertions.assertThrows(ExecutionException.class,"SHARED_EXECUTOR",1);
            executor.shutdown();
            executor = (ThreadPoolExecutor) getField(handler,"executor",1);
            executor.shutdown();
            handler.received(new MockedChannel(),"");
        });
    }

    /**
     * Events do not pass through the thread pool and execute directly on the IO
     */
    @SuppressWarnings("deprecation")
    @Disabled("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.")
    @Test
    public void test_Received_Event_invoke_direct() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false),1);
        executor.shutdown();
        executor = (ThreadPoolExecutor) getField(handler,1);
        executor.shutdown();
        Request req = new Request();
        req.setHeartbeat(true);
        final AtomicInteger count = new AtomicInteger(0);
        handler.received(new MockedChannel() {
            @Override
            public void send(Object message) throws RemotingException {
                Assertions.assertTrue(((Response) message).isHeartbeat(),"response.heartbeat");
                count.incrementAndGet();
            }
        },req);
        Assertions.assertEquals(1,count.get(),"channel.send must be invoke");
    }
}
複製程式碼
  • ConnectChannelHandlerTest在setup時建立的是ConnectionOrderedChannelHandler,然後進行了test_Connect_Blocked、test_Connect_Biz_Error、test_Disconnect_Biz_Error、test_Connect_Execute_Error、test_Disconnect_Execute_Error、test_MessageReceived_Biz_Error、test_Caught_Biz_Error、test_Received_InvokeInExecuter、test_Received_Event_invoke_direct

小結

  • ConnectionOrderedDispatcher實現了Dispatcher介面,其dispatch方法返回的是ConnectionOrderedChannelHandler;ConnectionOrderedChannelHandler繼承了WrappedChannelHandler,其構造器建立了corePoolSize及maximumPoolSize均為1,queue為LinkedBlockingQueue的connectionExecutor
  • ConnectionOrderedChannelHandler的connected、disconnected方法均是使用connectionExecutor來執行新建立的ChannelEventRunnable;這兩個方法均會先執行checkQueueLength來判斷queue大小是否大於queuewarninglimit,大於的話則列印warn日誌
  • ConnectionOrderedChannelHandler的received、caught均是通過父類的getExecutorService獲取執行緒池,然後執行建立的ChannelEventRunnable;received方法在捕獲到異常時RejectedExecutionException且message是Request,而且request是twoWay的時候會返回SERVER_THREADPOOL_EXHAUSTED_ERROR

doc