1. 程式人生 > 程式設計 >3. 原始碼分析---SOFARPC客戶端服務呼叫

3. 原始碼分析---SOFARPC客戶端服務呼叫

SOFARPC原始碼解析系列:

1. 原始碼分析---SOFARPC可擴充套件的機制SPI

2. 原始碼分析---SOFARPC客戶端服務引用

3. 原始碼分析---SOFARPC客戶端服務呼叫

4. 原始碼分析---SOFARPC服務端暴露

5.原始碼分析---SOFARPC呼叫服務

6.原始碼分析---和dubbo相比SOFARPC是如何實現負載均衡的?

7.原始碼分析---SOFARPC是如何實現連線管理與心跳?

8.原始碼分析---從設計模式中看SOFARPC中的EventBus?


我們首先看看BoltClientProxyInvoker的關係圖

所以當我們用BoltClientProxyInvoker#invoke的時候實際上是呼叫了父類的invoke方法 ClientProxyInvoker#invoke

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        Throwable throwable = null;
        try {
            RpcInternalContext.pushContext();
            RpcInternalContext context = RpcInternalContext.getContext();
            context.setProviderSide(false
); // 包裝request請求 decorateRequest(request); try { // 產生開始呼叫事件 if (EventBus.isEnable(ClientStartInvokeEvent.class)) { EventBus.post(new ClientStartInvokeEvent(request)); } // 得到結果 response = cluster.invoke(request); } catch
(SofaRpcException e) { throwable = e; throw e; } finally { // 產生呼叫結束事件 if (!request.isAsync()) { if (EventBus.isEnable(ClientEndInvokeEvent.class)) { EventBus.post(new ClientEndInvokeEvent(request,response,throwable)); } } } // 包裝響應 decorateResponse(response); return response; } finally { RpcInternalContext.removeContext(); RpcInternalContext.popContext(); } } 複製程式碼

這個方法主要做了幾件事:

  1. 包裝request請求,設定必要的引數
  2. 呼叫FailOverCluster的invoke方法,將reques請求傳送出去,並得到response相應
  3. 包裝response響應

我們在呼叫FailOverCluster的時候實際上是呼叫的父類AbstractCluster的invoker方法,FailOverCluster關係圖如下:

所以我們進入到AbstractCluster的invoker方法中:

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        try {
            // 做一些初始化檢查,例如未連線可以連線
            checkClusterState();
            // 開始呼叫
            countOfInvoke.incrementAndGet(); // 計數+1         
            response = doInvoke(request);
            return response;
        } catch (SofaRpcException e) {
            // 客戶端收到異常(客戶端自己的異常)
            throw e;
        } finally {
            countOfInvoke.decrementAndGet(); // 計數-1
        }
    }
複製程式碼

checkClusterState方法主要是用來校驗是否已銷燬了,或是呼叫了init方法進行初始化了。 然後會在呼叫之前記一下數。 然後我們進入到doInvoke方法中:

    public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
        String methodName = request.getMethodName();
        int retries = consumerConfig.getMethodRetries(methodName);
        int time = 0;
        SofaRpcException throwable = null;// 異常日誌
        List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
        do {
            //負載均衡
            ProviderInfo providerInfo = select(request,invokedProviderInfos);
            try {
                //呼叫過濾器鏈
                SofaResponse response = filterChain(providerInfo,request);
                if (response != null) {
                    if (throwable != null) {
                        if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
                            LOGGER.warnWithApp(consumerConfig.getAppName(),LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,throwable.getClass() + ":" + throwable.getMessage(),invokedProviderInfos));
                        }
                    }
                    return response;
                } else {
                    throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,"Failed to call " + request.getInterfaceName() + "." + methodName
                            + " on remote server " + providerInfo + ",return null");
                    time++;
                }
            } catch (SofaRpcException e) { // 服務端異常+ 超時異常 才發起rpc異常重試
                if (e.getErrorType() == RpcErrorType.SERVER_BUSY
                    || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
                    throwable = e;
                    time++;
                } else {
                    throw e;
                }
            } catch (Exception e) { // 其它異常不重試
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,"Failed to call " + request.getInterfaceName() + "." + request.getMethodName()
                        + " on remote server: " + providerInfo + ",cause by unknown exception: "
                        + e.getClass().getName() + ",message is: " + e.getMessage(),e);
            } finally {
                if (RpcInternalContext.isAttachmentEnable()) {
                    RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,time + 1); // 重試次數
                }
            }
            invokedProviderInfos.add(providerInfo);
        } while (time <= retries);

        throw throwable;
    }
複製程式碼

這個方法裡面主要做了這這件事:

  1. 如果失敗的話就迴圈呼叫
  2. 負載均衡,選取provider
  3. 通過過濾器鏈呼叫服務端,並返回結果
  4. 異常處理

接著我們進入到filterChain方法中,根據過濾器鏈最後會跳到ConsumerInvoker中的invoke方法

    @Override
    public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
        // 設定下伺服器應用
        ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
        String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
        if (StringUtils.isNotEmpty(appName)) {
            sofaRequest.setTargetAppName(appName);
        }

        // 目前只是通過client傳送給服務端
        return consumerBootstrap.getCluster().sendMsg(providerInfo,sofaRequest);
    }
複製程式碼

consumerBootstrap.getCluster()會返回FailOverCluster例項,然後呼叫父類AbstractCluster的sendMsg方法

    public SofaResponse sendMsg(ProviderInfo providerInfo,SofaRequest request) throws SofaRpcException {
        ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
        if (clientTransport != null && clientTransport.isAvailable()) {
            return doSendMsg(providerInfo,clientTransport,request);
        } else {
            throw unavailableProviderException(request.getTargetServiceUniqueName(),providerInfo.getOriginUrl());
        }
    }
    
    
    protected SofaResponse doSendMsg(ProviderInfo providerInfo,ClientTransport transport,SofaRequest request) throws SofaRpcException {
        RpcInternalContext context = RpcInternalContext.getContext();
        // 新增呼叫的服務端遠端地址
        RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(),providerInfo.getPort());
        try {
            checkProviderVersion(providerInfo,request); // 根據服務端版本特殊處理
            String invokeType = request.getInvokeType();
            int timeout = resolveTimeout(request,consumerConfig,providerInfo);

            SofaResponse response = null;
            // 同步呼叫
            if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    response = transport.syncSend(request,timeout);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        long elapsed = RpcRuntimeContext.now() - start;
                        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE,elapsed);
                    }
                }
            }
            // 單向呼叫
            else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    transport.oneWaySend(request,timeout);
                    response = buildEmptyResponse(request);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        long elapsed = RpcRuntimeContext.now() - start;
                        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE,elapsed);
                    }
                }
            }
            // Callback呼叫
            else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
                // 呼叫級別回撥監聽器
                SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
                if (sofaResponseCallback == null) {
                    SofaResponseCallback methodResponseCallback = consumerConfig
                        .getMethodOnreturn(request.getMethodName());
                    if (methodResponseCallback != null) { // 方法的Callback
                        request.setSofaResponseCallback(methodResponseCallback);
                    }
                }
                // 記錄傳送開始時間
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME,RpcRuntimeContext.now());
                // 開始呼叫
                transport.asyncSend(request,timeout);
                response = buildEmptyResponse(request);
            }
            // Future呼叫
            else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
                // 記錄傳送開始時間
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME,RpcRuntimeContext.now());
                // 開始呼叫
                ResponseFuture future = transport.asyncSend(request,timeout);
                // 放入執行緒上下文
                RpcInternalContext.getContext().setFuture(future);
                response = buildEmptyResponse(request);
            } else {
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,"Unknown invoke type:" + invokeType);
            }
            return response;
        } catch (SofaRpcException e) {
            throw e;
        } catch (Throwable e) { // 客戶端其它異常
            throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,e);
        }
    }
複製程式碼

sendMsg方法最後會呼叫到doSendMsg。 soSendMsg裡面主要做了如下幾件事:

  1. 如果是同步呼叫,則直接返回封裝好的引數
  2. 如果是單向呼叫,則呼叫buildEmptyResponse方法,返回一個空的response
  3. 如果是callback呼叫asyncSend,RPC在獲取到服務端的結果後會自動執行該回撥實現。
  4. 服務端返回響應結果被 RPC 快取,當客戶端需要響應結果的時候需要主動獲取結果,獲取結果的過程阻塞執行緒。