1. 程式人生 > 程式設計 >聊聊dubbo的FailbackClusterInvoker

聊聊dubbo的FailbackClusterInvoker

本文主要研究一下dubbo的FailbackClusterInvoker

FailbackClusterInvoker

dubbo-2.7.3/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(RETRIES_KEY,DEFAULT_FAILBACK_TIMES);
        if
(retriesConfig <= 0) { retriesConfig = DEFAULT_FAILBACK_TIMES; } int failbackTasksConfig = getUrl().getParameter(FAIL_BACK_TASKS_KEY,DEFAULT_FAILBACK_TASKS); if (failbackTasksConfig <= 0) { failbackTasksConfig = DEFAULT_FAILBACK_TASKS; } retries = retriesConfig; failbackTasks = failbackTasksConfig; } private void addFailed(LoadBalance loadbalance,Invocation invocation,List<Invoker<T>> invokers,Invoker<T> lastInvoker) { if
(failTimer == null) { synchronized (this) { if (failTimer == null) { failTimer = new HashedWheelTimer( new NamedThreadFactory("failback-cluster-timer",true),1,TimeUnit.SECONDS,32,failbackTasks); } } } RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance,invocation,invokers,lastInvoker,retries,RETRY_FAILED_PERIOD); try { failTimer.newTimeout(retryTimerTask,RETRY_FAILED_PERIOD,TimeUnit.SECONDS); } catch (Throwable e) { logger.error("Failback background works error,invocation->"
+ invocation + ",exception: " + e.getMessage()); } } @Override protected Result doInvoke(Invocation invocation,LoadBalance loadbalance) throws RpcException { Invoker<T> invoker = null; try { checkInvokers(invokers,invocation); invoker = select(loadbalance,null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ",wait for retry in background. Ignored exception: " + e.getMessage() + ",",e); addFailed(loadbalance,invoker); return AsyncRpcResult.newDefaultAsyncResult(null,null,invocation); // ignore } } @Override public void destroy() { super.destroy(); if (failTimer != null) { failTimer.stop(); } } /** * RetryTimerTask */ private class RetryTimerTask implements TimerTask { private final Invocation invocation; private final LoadBalance loadbalance; private final List<Invoker<T>> invokers; private final int retries; private final long tick; private Invoker<T> lastInvoker; private int retryTimes = 0; RetryTimerTask(LoadBalance loadbalance,Invoker<T> lastInvoker,int retries,long tick) { this.loadbalance = loadbalance; this.invocation = invocation; this.invokers = invokers; this.retries = retries; this.tick = tick; this.lastInvoker=lastInvoker; } @Override public void run(Timeout timeout) { try { Invoker<T> retryInvoker = select(loadbalance,Collections.singletonList(lastInvoker)); lastInvoker = retryInvoker; retryInvoker.invoke(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ",waiting again.",e); if ((++retryTimes) >= retries) { logger.error("Failed retry times exceed threshold (" + retries + "),We have to abandon,invocation->" + invocation); } else { rePut(timeout); } } } private void rePut(Timeout timeout) { if (timeout == null) { return; } Timer timer = timeout.timer(); if (timer.isStop() || timeout.isCancelled()) { return; } timer.newTimeout(timeout.task(),tick,TimeUnit.SECONDS); } } } 複製程式碼
  • FailbackClusterInvoker的構造器初始化了retriesConfig、failbackTasksConfig;doInvoke方法在catch到Throwable的時候,會執行addFailed方法,該方法會往HashedWheelTimer註冊一個RetryTimerTask,delay為5秒;RetryTimerTask的run方法首先會通過select方法選擇一個retryInvoker,然後進行重試,catch到Throwable時會遞增retryTimes,不超出限制時執行rePut方法,重新註冊RetryTimerTask

FailbackClusterInvokerTest

dubbo-2.7.3/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class FailbackClusterInvokerTest {

    List<Invoker<FailbackClusterInvokerTest>> invokers = new ArrayList<Invoker<FailbackClusterInvokerTest>>();
    URL url = URL.valueOf("test://test:11/test?retries=2&failbacktasks=2");
    Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
    RpcInvocation invocation = new RpcInvocation();
    Directory<FailbackClusterInvokerTest> dic;
    Result result = new AppResponse();

    /**
     * @throws java.lang.Exception
     */

    @BeforeEach
    public void setUp() throws Exception {

        dic = mock(Directory.class);
        given(dic.getUrl()).willReturn(url);
        given(dic.list(invocation)).willReturn(invokers);
        given(dic.getInterface()).willReturn(FailbackClusterInvokerTest.class);

        invocation.setMethodName("method1");

        invokers.add(invoker);
    }

    @AfterEach
    public void tearDown() {

        dic = null;
        invocation = new RpcInvocation();
        invokers.clear();
    }


    private void resetInvokerToException() {
        given(invoker.invoke(invocation)).willThrow(new RuntimeException());
        given(invoker.getUrl()).willReturn(url);
        given(invoker.getInterface()).willReturn(FailbackClusterInvokerTest.class);
    }

    private void resetInvokerToNoException() {
        given(invoker.invoke(invocation)).willReturn(result);
        given(invoker.getUrl()).willReturn(url);
        given(invoker.getInterface()).willReturn(FailbackClusterInvokerTest.class);
    }

    @Test
    @Order(1)
    public void testInvokeException() {
        resetInvokerToException();
        FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
                dic);
        invoker.invoke(invocation);
        Assertions.assertNull(RpcContext.getContext().getInvoker());
        DubboAppender.clear();
    }

    @Test
    @Order(2)
    public void testInvokeNoException() {

        resetInvokerToNoException();

        FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
                dic);
        Result ret = invoker.invoke(invocation);
        Assertions.assertSame(result,ret);
    }

    @Test
    @Order(3)
    public void testNoInvoke() {
        dic = mock(Directory.class);

        given(dic.getUrl()).willReturn(url);
        given(dic.list(invocation)).willReturn(null);
        given(dic.getInterface()).willReturn(FailbackClusterInvokerTest.class);

        invocation.setMethodName("method1");

        invokers.add(invoker);

        resetInvokerToNoException();

        FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
                dic);
        LogUtil.start();
        DubboAppender.clear();
        invoker.invoke(invocation);
        assertEquals(1,LogUtil.findMessage("Failback to invoke"));
        LogUtil.stop();
    }

    @Disabled
    @Test
    @Order(4)
    public void testARetryFailed() throws Exception {
        //Test retries and

        resetInvokerToException();

        FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new FailbackClusterInvoker<FailbackClusterInvokerTest>(
                dic);
        LogUtil.start();
        DubboAppender.clear();
        invoker.invoke(invocation);
        invoker.invoke(invocation);
        invoker.invoke(invocation);
        Assertions.assertNull(RpcContext.getContext().getInvoker());
//        invoker.retryFailed();// when retry the invoker which get from failed map already is not the mocked invoker,so
        //Ensure that the main thread is online
        CountDownLatch countDown = new CountDownLatch(1);
        countDown.await(15000L,TimeUnit.MILLISECONDS);
        LogUtil.stop();
        Assertions.assertEquals(4,LogUtil.findMessage(Level.ERROR,"Failed retry to invoke method"),"must have four error message ");
        Assertions.assertEquals(2,"Failed retry times exceed threshold"),"must have two error message ");
        Assertions.assertEquals(1,"Failback background works error"),"must have one error message ");
        // it can be invoke successfully
    }
}
複製程式碼
  • 這裡使用mockito來mock了resetInvokerToException、resetInvokerToNoException

小結

FailbackClusterInvoker的構造器初始化了retriesConfig、failbackTasksConfig;doInvoke方法在catch到Throwable的時候,會執行addFailed方法,該方法會往HashedWheelTimer註冊一個RetryTimerTask,delay為5秒;RetryTimerTask的run方法首先會通過select方法選擇一個retryInvoker,然後進行重試,catch到Throwable時會遞增retryTimes,不超出限制時執行rePut方法,重新註冊RetryTimerTask

doc