1. 程式人生 > 程式設計 >Dubbo原始碼解析(十九)遠端呼叫——開篇

Dubbo原始碼解析(十九)遠端呼叫——開篇

遠端呼叫——開篇

目標:介紹之後解讀遠端呼叫模組的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的原始碼解析。

前言

最近我面臨著一個選擇,因為dubbo 2.7.0-release出現在了倉庫裡,最近一直在進行2.7.0版本的code review,那我之前說這一系列的文章都是講述2.6.x版本的原始碼,我現在要不要選擇直接開始講解2.7.0的版本的原始碼呢?我最後還是決定繼續講解2.6.x,因為我覺得還是有很多公司在用著2.6.x的版本,並且對於升級2.7.0的計劃應該還沒那麼快,並且在瞭解2.6.x版本的原理後,再去了解2.7.0新增的特性會更加容易,也能夠品位到設計者的意圖。當然在結束2.6.x的重要模組講解後,我也會對2.7.0的新特性以及實現原理做一個全面的分析,2.7.0作為dubbo社群的畢業版,更加強大,敬請期待。

前面講了很多的內容,現在開始將遠端呼叫RPC,好像又回到我第一篇文章 《dubbo原始碼解析(一)Hello,Dubbo》,在這篇文章開頭我講到了什麼叫做RPC,再通俗一點講,就是我把一個專案的兩部分程式碼分開來,分別放到兩臺機器上,當我部署在A伺服器上的應用想要呼叫部署在B伺服器上的應用等方法,由於不存在同一個記憶體空間,不能直接呼叫。而其實整個dubbo都在做遠端呼叫的事情,它涉及到很多內容,比如配置、代理、叢集、監控等等,那麼這次講的內容是隻關心一對一的呼叫,dubbo-rpc遠端呼叫模組抽象各種協議,以及動態代理,Proxy層和Protocol層rpc的核心,我將會在本系列中講到。下面我們來看兩張官方檔案的圖:

  1. 暴露服務的時序圖:

dubbo-export

你會發現其中有我們以前講到的Transporter、Server、Registry,而這次的系列將會講到的就是紅色框框內的部分。

  1. 引用服務時序圖

dubbo-refer

在引用服務時序圖中,對應的也是紅色框框的部分。

當閱讀完該系列後,希望能對這個呼叫鏈有所感悟。接下來看看dubbo-rpc的包結構:

rpc目錄

可以看到有很多包,很規整,其中dubbo-rpc-api是對協議、暴露、引用、代理等的抽象和實現,是rpc整個設計的核心內容。其他的包則是dubbo支援的9種協議,在官方檔案也能檢視介紹,並且包括一種本地呼叫injvm。那麼我們再來看看dubbo-rpc-api中包結構:

dubbo-rpc-api包結構

  1. filter包:在進行服務引用時會進行一系列的過濾。其中包括了很多過濾器。
  2. listener包:看上面兩張服務引用和服務暴露的時序圖,發現有兩個listener,其中的邏輯實現就在這個包內
  3. protocol包:這個包實現了協議的一些公共邏輯
  4. proxy包:實現了代理的邏輯。
  5. service包:其中包含了一個需要呼叫的方法等封裝抽象。
  6. support包:包括了工具類
  7. 最外層的實現。

下面的篇幅設計,本文會講解最外層的原始碼和service下的原始碼,support包下的原始碼我會穿插在其他用到的地方一併講解,filter、listener、protocol、proxy以及各類協議的實現各自用一篇來講。

原始碼分析

(一)Invoker

public interface Invoker<T> extends Node {

    /**
     * get service interface.
     * 獲得服務介面
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     * 呼叫下一個會話域
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;

}
複製程式碼

該介面是實體域,它是dubbo的核心模型,其他模型都向它靠攏,或者轉化成它,它代表了一個可執行體,可以向它發起invoke呼叫,這個有可能是一個本地的實現,也可能是一個遠端的實現,也可能是一個叢集的實現。它代表了一次呼叫

(二)Invocation

public interface Invocation {

    /**
     * get method name.
     * 獲得方法名稱
     * @return method name.
     * @serial
     */
    String getMethodName();

    /**
     * get parameter types.
     * 獲得引數型別
     * @return parameter types.
     * @serial
     */
    Class<?>[] getParameterTypes();

    /**
     * get arguments.
     * 獲得引數
     * @return arguments.
     * @serial
     */
    Object[] getArguments();

    /**
     * get attachments.
     * 獲得附加值集合
     * @return attachments.
     * @serial
     */
    Map<String,String> getAttachments();

    /**
     * get attachment by key.
     * 獲得附加值
     * @return attachment value.
     * @serial
     */
    String getAttachment(String key);

    /**
     * get attachment by key with default value.
     * 獲得附加值
     * @return attachment value.
     * @serial
     */
    String getAttachment(String key,String defaultValue);

    /**
     * get the invoker in current context.
     * 獲得當前上下文的invoker
     * @return invoker.
     * @transient
     */
    Invoker<?> getInvoker();

}
複製程式碼

Invocation 是會話域,它持有呼叫過程中的變數,比如方法名,引數等。

(三)Exporter

public interface Exporter<T> {

    /**
     * get invoker.
     * 獲得對應的實體域invoker
     * @return invoker
     */
    Invoker<T> getInvoker();

    /**
     * unexport.
     * 取消暴露
     * <p>
     * <code>
     * getInvoker().destroy();
     * </code>
     */
    void unexport();

}
複製程式碼

該介面是暴露服務的介面,定義了兩個方法分別是獲得invoker和取消暴露服務。

(四)ExporterListener

@SPI
public interface ExporterListener {

    /**
     * The exporter exported.
     * 暴露服務
     * @param exporter
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
     */
    void exported(Exporter<?> exporter) throws RpcException;

    /**
     * The exporter unexported.
     * 取消暴露
     * @param exporter
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Exporter#unexport()
     */
    void unexported(Exporter<?> exporter);

}
複製程式碼

該介面是服務暴露的監聽器介面,定義了兩個方法是暴露和取消暴露,引數都是Exporter型別的。

(五)Protocol

@SPI("dubbo")
public interface Protocol {

    /**
     * Get default port when user doesn't config the port.
     * 獲得預設的埠
     * @return default port
     */
    int getDefaultPort();

    /**
     * Export service for remote invocation: <br>
     * 1. Protocol should record request source address after receive a request:
     * RpcContext.getContext().setRemoteAddress();<br>
     * 2. export() must be idempotent,that is,there's no difference between invoking once and invoking twice when
     * export the same URL<br>
     * 3. Invoker instance is passed in by the framework,protocol needs not to care <br>
     * 暴露服務方法,
     * @param <T>     Service type 服務型別
     * @param invoker Service invoker 服務的實體域
     * @return exporter reference for exported service,useful for unexport the service later
     * @throws RpcException thrown when error occurs during export the service,for example: port is occupied
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * Refer a remote service: <br>
     * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call,the protocol
     * needs to correspondingly execute `invoke()` method of `Invoker` object <br>
     * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,* protocol sends remote request in the `Invoker` implementation. <br>
     * 3. When there's check=false set in URL,the implementation must not throw exception but try to recover when
     * connection fails.
     * 引用服務方法
     * @param <T>  Service type 服務型別
     * @param type Service class 服務類名
     * @param url  URL address for the remote service
     * @return invoker service's local proxy
     * @throws RpcException when there's any error while connecting to the service provider
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type,URL url) throws RpcException;

    /**
     * Destroy protocol: <br>
     * 1. Cancel all services this protocol exports and refers <br>
     * 2. Release all occupied resources,for example: connection,port,etc. <br>
     * 3. Protocol can continue to export and refer new service even after it's destroyed.
     */
    void destroy();

}
複製程式碼

該介面是服務域介面,也是協議介面,它是一個可擴充套件的介面,預設實現的是dubbo協議。定義了四個方法,關鍵的是服務暴露和引用兩個方法。

(六)Filter

@SPI
public interface Filter {

    /**
     * do invoke filter.
     * <p>
     * <code>
     * // before filter
     * Result result = invoker.invoke(invocation);
     * // after filter
     * return result;
     * </code>
     *
     * @param invoker    service
     * @param invocation invocation.
     * @return invoke result.
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
     */
    Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException;

}
複製程式碼

該介面是invoker呼叫時過濾器介面,其中就只有一個invoke方法。在該方法中對呼叫進行過濾

(七)InvokerListener

@SPI
public interface InvokerListener {

    /**
     * The invoker referred
     * 在服務引用的時候進行監聽
     * @param invoker
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Protocol#refer(Class,com.alibaba.dubbo.common.URL)
     */
    void referred(Invoker<?> invoker) throws RpcException;

    /**
     * The invoker destroyed.
     * 銷燬實體域
     * @param invoker
     * @see com.alibaba.dubbo.rpc.Invoker#destroy()
     */
    void destroyed(Invoker<?> invoker);

}
複製程式碼

該介面是實體域的監聽器,定義了兩個方法,分別是服務引用和銷燬的時候執行的方法。

(八)Result

該介面是實體域執行invoke的結果介面,裡面定義了獲得結果異常以及附加值等方法。比較好理解我就不貼程式碼了。

(九)ProxyFactory

@SPI("javassist")
public interface ProxyFactory {

    /**
     * create proxy.
     * 建立一個代理
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create proxy.
     * 建立一個代理
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker,boolean generic) throws RpcException;

    /**
     * create invoker.
     * 建立一個實體域
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy,Class<T> type,URL url) throws RpcException;

}
複製程式碼

該介面是代理工廠介面,它也是個可擴充套件介面,預設實現javassist,dubbo提供兩種動態代理方法分別是javassist/jdk,該介面定義了三個方法,前兩個方法是通過invoker建立代理,最後一個是通過代理來獲得invoker。

(十)RpcContext

該類就是遠端呼叫的上下文,貫穿著整個呼叫,例如A呼叫B,然後B呼叫C。在服務B上,RpcContext在B之前將呼叫資訊從A儲存到B。開始呼叫C,並在B呼叫C後將呼叫資訊從B儲存到C。RpcContext儲存了呼叫資訊。

public class RpcContext {

    /**
     * use internal thread local to improve performance
     * 本地上下文
     */
    private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
    /**
     * 服務上下文
     */
    private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };

    /**
     * 附加值集合
     */
    private final Map<String,String> attachments = new HashMap<String,String>();
    /**
     * 上下文值
     */
    private final Map<String,Object> values = new HashMap<String,Object>();
    /**
     * 執行緒結果
     */
    private Future<?> future;

    /**
     * url集合
     */
    private List<URL> urls;

    /**
     * 當前的url
     */
    private URL url;

    /**
     * 方法名稱
     */
    private String methodName;

    /**
     * 引數型別集合
     */
    private Class<?>[] parameterTypes;

    /**
     * 引數集合
     */
    private Object[] arguments;

    /**
     * 本地地址
     */
    private InetSocketAddress localAddress;

    /**
     * 遠端地址
     */
    private InetSocketAddress remoteAddress;
    /**
     * 實體域集合
     */
    @Deprecated
    private List<Invoker<?>> invokers;
    /**
     * 實體域
     */
    @Deprecated
    private Invoker<?> invoker;
    /**
     * 會話域
     */
    @Deprecated
    private Invocation invocation;

    // now we don't use the 'values' map to hold these objects
    // we want these objects to be as generic as possible
    /**
     * 請求
     */
    private Object request;
    /**
     * 響應
     */
    private Object response;
複製程式碼

該類中最重要的是它的一些屬性,因為該上下文就是用來儲存資訊的。方法我就不介紹了,因為比較簡單。

(十一)RpcException

/**
 * 不知道異常
 */
public static final int UNKNOWN_EXCEPTION = 0;
/**
 * 網路異常
 */
public static final int NETWORK_EXCEPTION = 1;
/**
 * 超時異常
 */
public static final int TIMEOUT_EXCEPTION = 2;
/**
 * 基礎異常
 */
public static final int BIZ_EXCEPTION = 3;
/**
 * 禁止訪問異常
 */
public static final int FORBIDDEN_EXCEPTION = 4;
/**
 * 序列化異常
 */
public static final int SERIALIZATION_EXCEPTION = 5;
複製程式碼

該類是rpc呼叫丟擲的異常類,其中封裝了五種通用的錯誤碼。

(十二)RpcInvocation

/**
 * 方法名稱
 */
private String methodName;

/**
 * 引數型別集合
 */
private Class<?>[] parameterTypes;

/**
 * 引數集合
 */
private Object[] arguments;

/**
 * 附加值
 */
private Map<String,String> attachments;

/**
 * 實體域
 */
private transient Invoker<?> invoker;
複製程式碼

該類實現了Invocation介面,是rpc的會話域,其中的方法比較簡單,主要是封裝了上述的屬性。

(十三)RpcResult

/**
 * 結果
 */
private Object result;

/**
 * 異常
 */
private Throwable exception;

/**
 * 附加值
 */
private Map<String,String>();
複製程式碼

該類實現了Result介面,是rpc的結果實現類,其中關鍵是封裝了以上三個屬性。

(十四)RpcStatus

該類是rpc的一些狀態監控,其中封裝了許多的計數器,用來記錄rpc呼叫的狀態。

1.屬性

/**
 * uri對應的狀態集合,key為uri,value為RpcStatus物件
 */
private static final ConcurrentMap<String,RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String,RpcStatus>();

/**
 * method對應的狀態集合,key是uri,第二個key是方法名methodName
 */
private static final ConcurrentMap<String,ConcurrentMap<String,RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String,RpcStatus>>();
/**
 * 已經沒用了
 */
private final ConcurrentMap<String,Object> values = new ConcurrentHashMap<String,Object>();
/**
 * 活躍狀態
 */
private final AtomicInteger active = new AtomicInteger();
/**
 * 總的數量
 */
private final AtomicLong total = new AtomicLong();
/**
 * 失敗的個數
 */
private final AtomicInteger failed = new AtomicInteger();
/**
 * 總呼叫時長
 */
private final AtomicLong totalElapsed = new AtomicLong();
/**
 * 總呼叫失敗時長
 */
private final AtomicLong failedElapsed = new AtomicLong();
/**
 * 最大呼叫時長
 */
private final AtomicLong maxElapsed = new AtomicLong();
/**
 * 最大呼叫失敗時長
 */
private final AtomicLong failedMaxElapsed = new AtomicLong();
/**
 * 最大呼叫成功時長
 */
private final AtomicLong succeededMaxElapsed = new AtomicLong();

/**
 * Semaphore used to control concurrency limit set by `executes`
 * 訊號量用來控制`execution`設定的併發限制
 */
private volatile Semaphore executesLimit;
/**
 * 用來控制`execution`設定的許可證
 */
private volatile int executesPermits;
複製程式碼

以上是該類的屬性,可以看到儲存了很多的計數器,分別用來記錄了失敗呼叫成功呼叫等累計數。

2.beginCount

/**
 * 開始計數
 * @param url
 */
public static void beginCount(URL url,String methodName) {
    // 對該url對應對活躍計數器加一
    beginCount(getStatus(url));
    // 對該方法對活躍計數器加一
    beginCount(getStatus(url,methodName));
}

/**
 * 以原子方式加1
 * @param status
 */
private static void beginCount(RpcStatus status) {
    status.active.incrementAndGet();
}
複製程式碼

該方法是增加計數。

3.endCount

public static void endCount(URL url,String methodName,long elapsed,boolean succeeded) {
    // url對應的狀態中計數器減一
    endCount(getStatus(url),elapsed,succeeded);
    // 方法對應的狀態中計數器減一
    endCount(getStatus(url,methodName),succeeded);
}

private static void endCount(RpcStatus status,boolean succeeded) {
    // 活躍計數器減一
    status.active.decrementAndGet();
    // 總計數器加1
    status.total.incrementAndGet();
    // 總呼叫時長加上呼叫時長
    status.totalElapsed.addAndGet(elapsed);
    // 如果最大呼叫時長小於elapsed,則設定最大呼叫時長
    if (status.maxElapsed.get() < elapsed) {
        status.maxElapsed.set(elapsed);
    }
    // 如果rpc呼叫成功
    if (succeeded) {
        // 如果成最大呼叫成功時長小於elapsed,則設定最大呼叫成功時長
        if (status.succeededMaxElapsed.get() < elapsed) {
            status.succeededMaxElapsed.set(elapsed);
        }
    } else {
        // 失敗計數器加一
        status.failed.incrementAndGet();
        // 失敗的過期數加上elapsed
        status.failedElapsed.addAndGet(elapsed);
        // 總呼叫失敗時長小於elapsed,則設定總呼叫失敗時長
        if (status.failedMaxElapsed.get() < elapsed) {
            status.failedMaxElapsed.set(elapsed);
        }
    }
}
複製程式碼

該方法是計數器減少。

(十五)StaticContext

該類是系統上下文,僅供內部使用。

/**
 * 系統名稱
 */
private static final String SYSTEMNAME = "system";
/**
 * 系統上下文集合,僅供內部使用
 */
private static final ConcurrentMap<String,StaticContext> context_map = new ConcurrentHashMap<String,StaticContext>();
/**
 * 系統上下文名稱
 */
private String name;
複製程式碼

上面是該類的屬性,它還記錄了所有的系統上下文集合。

(十六)EchoService

public interface EchoService {

    /**
     * echo test.
     * 回聲測試
     * @param message message.
     * @return message.
     */
    Object $echo(Object message);

}

複製程式碼

該介面是回聲服務介面,定義了一個一個回聲測試的方法,回聲測試用於檢測服務是否可用,回聲測試按照正常請求流程執行,能夠測試整個呼叫是否通暢,可用於監控,所有服務自動實現該介面,只需將任意服務強制轉化為EchoService,就可以用了。

(十七)GenericException

該方法是通用的異常類。

/**
 * 異常類名
 */
private String exceptionClass;

/**
 * 異常資訊
 */
private String exceptionMessage;

複製程式碼

比較簡單,就封裝了兩個屬性。

(十八)GenericService

public interface GenericService {

    /**
     * Generic invocation
     * 通用的會話域
     * @param method         Method name,e.g. findPerson. If there are overridden methods,parameter info is
     *                       required,e.g. findPerson(java.lang.String)
     * @param parameterTypes Parameter types
     * @param args           Arguments
     * @return invocation return value
     * @throws Throwable potential exception thrown from the invocation
     */
    Object $invoke(String method,String[] parameterTypes,Object[] args) throws GenericException;

}

複製程式碼

該介面是通用的服務介面,同樣定義了一個類似invoke的方法

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了遠端呼叫的開篇,介紹之後解讀遠端呼叫模組的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的原始碼解析,其中的邏輯不負責,要關注的是其中的一些概念和dubbo如何去做暴露服務和引用服務,其中很多的介面定義需要弄清楚。接下來我將開始對rpc模組的過濾器進行講解。