1. 程式人生 > >11.原始碼分析---SOFARPC資料透傳是實現的?

11.原始碼分析---SOFARPC資料透傳是實現的?

先把栗子放上,讓大家方便測試用:
Service端

public static void main(String[] args) {
    ServerConfig serverConfig = new ServerConfig()
        .setProtocol("bolt") // 設定一個協議,預設bolt
        .setPort(12200) // 設定一個埠,預設12200
        .setDaemon(false); // 非守護執行緒

    ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
        .setInterfaceId(HelloService.class.getName()) // 指定介面
        .setRef(new HelloServiceImpl()) // 指定實現
        .setServer(serverConfig); // 指定服務端

    providerConfig.export(); // 釋出服務
}

public class HelloServiceImpl implements HelloService {

    private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class);

    @Override
    public String sayHello(String string) {
        LOGGER.info("Server receive: " + string);

        // 獲取請求透傳資料並列印
        System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag"));
        // 設定響應透傳資料到當前執行緒的上下文中
        RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c");

        return "hello " + string + " !";
    }
}

client端

public static void main(String[] args) {
    ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
        .setInterfaceId(HelloService.class.getName()) // 指定介面
        .setProtocol("bolt") // 指定協議
        .setDirectUrl("bolt://127.0.0.1:12200") // 指定直連地址
        .setConnectTimeout(10 * 1000);

    RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb");

    HelloService helloService = consumerConfig.refer();

    while (true) {
        System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag"));
        try {
            LOGGER.info(helloService.sayHello("world"));
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

通過上面的栗子我們可以看出整個流程應該是:

  1. 客戶端把需要透傳的資料放入到requestBaggage中,然後呼叫服務端
  2. 服務端在HelloServiceImpl中獲取請求透傳資料並列印,並把響應資料放入到responseBaggage中
  3. 客戶端收到透傳資料

所以下面我們從客戶端開始原始碼講解。

客戶端資料透傳給服務端

首先客戶端在引用之前要設定putRequestBaggage,然後在客戶端引用的時候會呼叫ClientProxyInvoker#invoke方法。

如下:
ClientProxyInvoker#invoke

public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    ....
        // 包裝請求
        decorateRequest(request);
       ....
}

通過呼叫decorateRequest會呼叫到子類DefaultClientProxyInvoker的decorateRequest方法。

DefaultClientProxyInvoker#decorateRequest

protected void decorateRequest(SofaRequest request) {
    ....
    RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
    RpcInternalContext internalContext = RpcInternalContext.getContext();
    if (invokeCtx != null) {
       ....
        // 如果使用者指定了透傳資料
        if (RpcInvokeContext.isBaggageEnable()) {
            // 需要透傳
            BaggageResolver.carryWithRequest(invokeCtx, request);
            internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
        }
    }
    ....
} 

在decorateRequest方法裡首先會校驗有沒有開啟透傳資料,如果開啟了,那麼就呼叫BaggageResolver#carryWithRequest,把要透傳的資料放入到request裡面

BaggageResolver#carryWithRequest

public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {
    if (context != null) {
          //獲取所有的透傳資料
        Map<String, String> requestBaggage = context.getAllRequestBaggage();
        if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透傳
            request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);
        }
    }
}

這個方法裡面要做的就是獲取所有的透傳資料,然後放置到RequestProp裡面,這樣在傳送請求的時候就會傳送到服務端。

服務端接受透傳資料

服務端的呼叫流程如下:

BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker 

所以從上面的呼叫鏈可以知道,在服務端引用的時候會經過ProviderBaggageFilter過濾器,我們下面看看這個過濾器做了什麼事情:

ProviderBaggageFilter#invoke

public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
    SofaResponse response = null;
    try {
        //從request中獲取透傳資料存入到requestBaggage中
        BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);
        response = invoker.invoke(request);
    } finally {
        if (response != null) {
            BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);
        }
    }
    return response;
}

ProviderBaggageFilter會呼叫BaggageResolver#pickupFromRequest從request中獲取資料

BaggageResolver#pickupFromRequest

public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {
    if (context == null && !init) {
        return;
    }
    // 解析請求 
    Map<String, String> requestBaggage = (Map<String, String>) request
        .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);
    if (CommonUtils.isNotEmpty(requestBaggage)) {
        if (context == null) {
            context = RpcInvokeContext.getContext();
        }
        context.putAllRequestBaggage(requestBaggage);
    }
}

最後會在ProviderBaggageFilter invoke方法的finally裡面呼叫BaggageResolver#carryWithResponse把響應透傳資料回寫到response裡面。

public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {
    if (context != null) {
        Map<String, String> responseBaggage = context.getAllResponseBaggage();
        if (CommonUtils.isNotEmpty(responseBaggage)) {
            String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
            for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
                response.addResponseProp(prefix + entry.getKey(), entry.getValue());
            }
        }
    }
}

客戶端收到響應透傳資料

最後客戶端會在ClientProxyInvoker#invoke方法裡呼叫decorateResponse獲取response回寫的資料。

public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        ....
     // 包裝響應
     decorateResponse(response);
        ....
}

decorateResponse是在子類DefaultClientProxyInvoker實現的:

DefaultClientProxyInvoker#decorateResponse

protected void decorateResponse(SofaResponse response) {
   ....
    //如果開啟了透傳
    if (RpcInvokeContext.isBaggageEnable()) {
        BaggageResolver.pickupFromResponse(invokeCtx, response, true);
    }
   ....
}

這個方法裡面會呼叫BaggageResolver#pickupFromResponse

public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {
    if (context == null && !init) {
        return;
    }
    Map<String, String> responseBaggage = response.getResponseProps();
    if (CommonUtils.isNotEmpty(responseBaggage)) {
        String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
        for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
            if (entry.getKey().startsWith(prefix)) {
                if (context == null) {
                    context = RpcInvokeContext.getContext();
                }
                //因為entry的key裡面會包含rpc_resp_baggage,所以需要擷取掉
                context.putResponseBaggage(entry.getKey().substring(prefix.length()),
                    entry.getValue());
            }
        }
    }
}

這個方法裡面response獲取所有的透傳資料,然後放入到ResponseBaggage中。

到這裡SOFARPC資料透傳就分析完畢