1. 程式人生 > 程式設計 >Dubbo原始碼解析(二十六)遠端呼叫——http協議

Dubbo原始碼解析(二十六)遠端呼叫——http協議

遠端呼叫——http協議

目標:介紹遠端呼叫中跟http協議相關的設計和實現,介紹dubbo-rpc-http的原始碼。

前言

基於HTTP表單的遠端呼叫協議,採用 Spring 的HttpInvoker實現,關於http協議就不用多說了吧。

原始碼分析

(一)HttpRemoteInvocation

該類繼承了RemoteInvocation類,是在RemoteInvocation上增加了泛化呼叫的引數設定,以及增加了dubbo本身需要的附加值設定。

public class HttpRemoteInvocation extends RemoteInvocation {

    private
static final long serialVersionUID = 1L; /** * dubbo的附加值名稱 */ private static final String dubboAttachmentsAttrName = "dubbo.attachments"; public HttpRemoteInvocation(MethodInvocation methodInvocation) { super(methodInvocation); // 把附加值加入到會話域的屬性裡面 addAttribute(dubboAttachmentsAttrName,new
HashMap<String,String>(RpcContext.getContext().getAttachments())); } @Override public Object invoke(Object targetObject) throws NoSuchMethodException,IllegalAccessException,InvocationTargetException { // 獲得上下文 RpcContext context = RpcContext.getContext(); // 獲得附加值
context.setAttachments((Map<String,String>) getAttribute(dubboAttachmentsAttrName)); // 泛化標誌 String generic = (String) getAttribute(Constants.GENERIC_KEY); // 如果不為空,則設定泛化標誌 if (StringUtils.isNotEmpty(generic)) { context.setAttachment(Constants.GENERIC_KEY,generic); } try { // 呼叫下一個呼叫鏈 return super.invoke(targetObject); } finally { context.setAttachments(null); } } } 複製程式碼

(二)HttpProtocol

該類是http實現的核心,跟我在《dubbo原始碼解析(二十五)遠端呼叫——hessian協議》中講到的HessianProtocol實現有很多地方相似。

1.屬性

/**
 * 預設的埠號
 */
public static final int DEFAULT_PORT = 80;

/**
 * http伺服器集合
 */
private final Map<String,HttpServer> serverMap = new ConcurrentHashMap<String,HttpServer>();

/**
 * Spring HttpInvokerServiceExporter 集合
 */
private final Map<String,HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String,HttpInvokerServiceExporter>();

/**
 * HttpBinder物件
 */
private HttpBinder httpBinder;
複製程式碼

2.doExport

@Override
protected <T> Runnable doExport(final T impl,Class<T> type,URL url) throws RpcException {
    // 獲得ip地址
    String addr = getAddr(url);
    // 獲得http伺服器
    HttpServer server = serverMap.get(addr);
    // 如果伺服器為空,則重新建立伺服器,並且加入到集合
    if (server == null) {
        server = httpBinder.bind(url,new InternalHandler());
        serverMap.put(addr,server);
    }

    // 獲得服務path
    final String path = url.getAbsolutePath();

    // 加入集合
    skeletonMap.put(path,createExporter(impl,type));

    // 通用path
    final String genericPath = path + "/" + Constants.GENERIC_KEY;

    // 新增泛化的服務呼叫
    skeletonMap.put(genericPath,GenericService.class));
    return new Runnable() {
        @Override
        public void run() {
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        }
    };
}
複製程式碼

該方法是暴露服務等邏輯,因為dubbo實現http協議採用了Spring 的HttpInvoker實現,所以呼叫了createExporter方法來建立建立HttpInvokerServiceExporter。

3.createExporter

private <T> HttpInvokerServiceExporter createExporter(T impl,Class<?> type) {
    // 建立HttpInvokerServiceExporter
    final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
    // 設定要訪問的服務的介面
    httpServiceExporter.setServiceInterface(type);
    // 設定服務實現
    httpServiceExporter.setService(impl);
    try {
        // 在BeanFactory設定了所有提供的bean屬性,初始化bean的時候執行,可以針對某個具體的bean進行配
        httpServiceExporter.afterPropertiesSet();
    } catch (Exception e) {
        throw new RpcException(e.getMessage(),e);
    }
    return httpServiceExporter;
}
複製程式碼

該方法是建立一個spring 的HttpInvokerServiceExporter。

4.doRefer

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType,final URL url) throws RpcException {
    // 獲得泛化配置
    final String generic = url.getParameter(Constants.GENERIC_KEY);
    // 是否為泛化呼叫
    final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

    // 建立HttpInvokerProxyFactoryBean
    final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
    // 設定RemoteInvocation的工廠類
    httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
        /**
         * 為給定的AOP方法呼叫建立一個新的RemoteInvocation物件。
         * @param methodInvocation
         * @return
         */
        @Override
        public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
            // 新建一個HttpRemoteInvocation
            RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation);
            // 如果是泛化呼叫
            if (isGeneric) {
                // 設定標誌
                invocation.addAttribute(Constants.GENERIC_KEY,generic);
            }
            return invocation;
        }
    });
    // 獲得identity message
    String key = url.toIdentityString();
    // 如果是泛化呼叫
    if (isGeneric) {
        key = key + "/" + Constants.GENERIC_KEY;
    }
    // 設定服務url
    httpProxyFactoryBean.setServiceUrl(key);
    // 設定服務介面
    httpProxyFactoryBean.setServiceInterface(serviceType);
    // 獲得客戶端引數
    String client = url.getParameter(Constants.CLIENT_KEY);
    if (client == null || client.length() == 0 || "simple".equals(client)) {
        // 建立SimpleHttpInvokerRequestExecutor連線池 使用的是JDK HttpClient
        SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
            @Override
            protected void prepareConnection(HttpURLConnection con,int contentLength) throws IOException {
                super.prepareConnection(con,contentLength);
                // 設定讀取超時時間
                con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT));
                // 設定連線超時時間
                con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY,Constants.DEFAULT_CONNECT_TIMEOUT));
            }
        };
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else if ("commons".equals(client)) {
        // 建立 HttpComponentsHttpInvokerRequestExecutor連線池 使用的是Apache HttpClient
        HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
        // 設定讀取超時時間
        httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT));
        // 設定連線超時時間
        httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY,Constants.DEFAULT_CONNECT_TIMEOUT));
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else {
        throw new IllegalStateException("Unsupported http protocol client " + client + ",only supported: simple,commons");
    }
    httpProxyFactoryBean.afterPropertiesSet();
    // 返回HttpInvokerProxyFactoryBean物件
    return (T) httpProxyFactoryBean.getObject();
}
複製程式碼

該方法是服務引用的方法,其中根據url配置simple還是commons來選擇建立連線池的方式。其中的區別就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。

5.getErrorCode

@Override
protected int getErrorCode(Throwable e) {
    if (e instanceof RemoteAccessException) {
        e = e.getCause();
    }
    if (e != null) {
        Class<?> cls = e.getClass();
        if (SocketTimeoutException.class.equals(cls)) {
            // 返回超時異常
            return RpcException.TIMEOUT_EXCEPTION;
        } else if (IOException.class.isAssignableFrom(cls)) {
            // 返回網路異常
            return RpcException.NETWORK_EXCEPTION;
        } else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
            // 返回序列化異常
            return RpcException.SERIALIZATION_EXCEPTION;
        }
    }
    return super.getErrorCode(e);
}
複製程式碼

該方法是處理異常情況,設定錯誤碼。

6.InternalHandler

private class InternalHandler implements HttpHandler {

    @Override
    public void handle(HttpServletRequest request,HttpServletResponse response)
            throws IOException,ServletException {
        // 獲得請求uri
        String uri = request.getRequestURI();
        // 獲得服務暴露者HttpInvokerServiceExporter物件
        HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
        // 如果不是post,則返回碼設定500
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            response.setStatus(500);
        } else {
            // 遠端地址放到上下文
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(),request.getRemotePort());
            try {
                // 呼叫下一個呼叫
                skeleton.handleRequest(request,response);
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }

}
複製程式碼

該內部類實現了HttpHandler,做了設定遠端地址的邏輯。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了遠端呼叫中關於http協議的部分,內容比較簡單,可以參考著官方檔案瞭解一下。接下來我將開始對rpc模組關於injvm本地呼叫部分進行講解。