Dubbo原始碼解析(十九)遠端呼叫——開篇
遠端呼叫——開篇
目標:介紹之後解讀遠端呼叫模組的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的原始碼解析。
前言
最近我面臨著一個選擇,因為dubbo 2.7.0-release出現在了倉庫裡,最近一直在進行2.7.0版本的code review,那我之前說這一系列的文章都是講述2.6.x版本的原始碼,我現在要不要選擇直接開始講解2.7.0的版本的原始碼呢?我最後還是決定繼續講解2.6.x,因為我覺得還是有很多公司在用著2.6.x的版本,並且對於升級2.7.0的計劃應該還沒那麼快,並且在瞭解2.6.x版本的原理後,再去了解2.7.0新增的特性會更加容易,也能夠品位到設計者的意圖。當然在結束2.6.x的重要模組講解後,我也會對2.7.0的新特性以及實現原理做一個全面的分析,2.7.0作為dubbo社群的畢業版,更加強大,敬請期待。
前面講了很多的內容,現在開始將遠端呼叫RPC,好像又回到我第一篇文章 《dubbo原始碼解析(一)Hello,Dubbo》,在這篇文章開頭我講到了什麼叫做RPC,再通俗一點講,就是我把一個專案的兩部分程式碼分開來,分別放到兩臺機器上,當我部署在A伺服器上的應用想要呼叫部署在B伺服器上的應用等方法,由於不存在同一個記憶體空間,不能直接呼叫。而其實整個dubbo都在做遠端呼叫的事情,它涉及到很多內容,比如配置、代理、叢集、監控等等,那麼這次講的內容是隻關心一對一的呼叫,dubbo-rpc遠端呼叫模組抽象各種協議,以及動態代理,Proxy層和Protocol層rpc的核心,我將會在本系列中講到。下面我們來看兩張官方檔案的圖:
- 暴露服務的時序圖:
你會發現其中有我們以前講到的Transporter、Server、Registry,而這次的系列將會講到的就是紅色框框內的部分。
- 引用服務時序圖
在引用服務時序圖中,對應的也是紅色框框的部分。
當閱讀完該系列後,希望能對這個呼叫鏈有所感悟。接下來看看dubbo-rpc的包結構:
可以看到有很多包,很規整,其中dubbo-rpc-api是對協議、暴露、引用、代理等的抽象和實現,是rpc整個設計的核心內容。其他的包則是dubbo支援的9種協議,在官方檔案也能檢視介紹,並且包括一種本地呼叫injvm。那麼我們再來看看dubbo-rpc-api中包結構:
- filter包:在進行服務引用時會進行一系列的過濾。其中包括了很多過濾器。
- listener包:看上面兩張服務引用和服務暴露的時序圖,發現有兩個listener,其中的邏輯實現就在這個包內
- protocol包:這個包實現了協議的一些公共邏輯
- proxy包:實現了代理的邏輯。
- service包:其中包含了一個需要呼叫的方法等封裝抽象。
- support包:包括了工具類
- 最外層的實現。
下面的篇幅設計,本文會講解最外層的原始碼和service下的原始碼,support包下的原始碼我會穿插在其他用到的地方一併講解,filter、listener、protocol、proxy以及各類協議的實現各自用一篇來講。
原始碼分析
(一)Invoker
public interface Invoker<T> extends Node {
/**
* get service interface.
* 獲得服務介面
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
* 呼叫下一個會話域
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
複製程式碼
該介面是實體域,它是dubbo的核心模型,其他模型都向它靠攏,或者轉化成它,它代表了一個可執行體,可以向它發起invoke呼叫,這個有可能是一個本地的實現,也可能是一個遠端的實現,也可能是一個叢集的實現。它代表了一次呼叫
(二)Invocation
public interface Invocation {
/**
* get method name.
* 獲得方法名稱
* @return method name.
* @serial
*/
String getMethodName();
/**
* get parameter types.
* 獲得引數型別
* @return parameter types.
* @serial
*/
Class<?>[] getParameterTypes();
/**
* get arguments.
* 獲得引數
* @return arguments.
* @serial
*/
Object[] getArguments();
/**
* get attachments.
* 獲得附加值集合
* @return attachments.
* @serial
*/
Map<String,String> getAttachments();
/**
* get attachment by key.
* 獲得附加值
* @return attachment value.
* @serial
*/
String getAttachment(String key);
/**
* get attachment by key with default value.
* 獲得附加值
* @return attachment value.
* @serial
*/
String getAttachment(String key,String defaultValue);
/**
* get the invoker in current context.
* 獲得當前上下文的invoker
* @return invoker.
* @transient
*/
Invoker<?> getInvoker();
}
複製程式碼
Invocation 是會話域,它持有呼叫過程中的變數,比如方法名,引數等。
(三)Exporter
public interface Exporter<T> {
/**
* get invoker.
* 獲得對應的實體域invoker
* @return invoker
*/
Invoker<T> getInvoker();
/**
* unexport.
* 取消暴露
* <p>
* <code>
* getInvoker().destroy();
* </code>
*/
void unexport();
}
複製程式碼
該介面是暴露服務的介面,定義了兩個方法分別是獲得invoker和取消暴露服務。
(四)ExporterListener
@SPI
public interface ExporterListener {
/**
* The exporter exported.
* 暴露服務
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
*/
void exported(Exporter<?> exporter) throws RpcException;
/**
* The exporter unexported.
* 取消暴露
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Exporter#unexport()
*/
void unexported(Exporter<?> exporter);
}
複製程式碼
該介面是服務暴露的監聽器介面,定義了兩個方法是暴露和取消暴露,引數都是Exporter型別的。
(五)Protocol
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
* 獲得預設的埠
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent,that is,there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework,protocol needs not to care <br>
* 暴露服務方法,
* @param <T> Service type 服務型別
* @param invoker Service invoker 服務的實體域
* @return exporter reference for exported service,useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service,for example: port is occupied
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call,the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL,the implementation must not throw exception but try to recover when
* connection fails.
* 引用服務方法
* @param <T> Service type 服務型別
* @param type Service class 服務類名
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type,URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources,for example: connection,port,etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}
複製程式碼
該介面是服務域介面,也是協議介面,它是一個可擴充套件的介面,預設實現的是dubbo協議。定義了四個方法,關鍵的是服務暴露和引用兩個方法。
(六)Filter
@SPI
public interface Filter {
/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke(Invoker<?> invoker,Invocation invocation) throws RpcException;
}
複製程式碼
該介面是invoker呼叫時過濾器介面,其中就只有一個invoke方法。在該方法中對呼叫進行過濾
(七)InvokerListener
@SPI
public interface InvokerListener {
/**
* The invoker referred
* 在服務引用的時候進行監聽
* @param invoker
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#refer(Class,com.alibaba.dubbo.common.URL)
*/
void referred(Invoker<?> invoker) throws RpcException;
/**
* The invoker destroyed.
* 銷燬實體域
* @param invoker
* @see com.alibaba.dubbo.rpc.Invoker#destroy()
*/
void destroyed(Invoker<?> invoker);
}
複製程式碼
該介面是實體域的監聽器,定義了兩個方法,分別是服務引用和銷燬的時候執行的方法。
(八)Result
該介面是實體域執行invoke的結果介面,裡面定義了獲得結果異常以及附加值等方法。比較好理解我就不貼程式碼了。
(九)ProxyFactory
@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
* 建立一個代理
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create proxy.
* 建立一個代理
* @param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker,boolean generic) throws RpcException;
/**
* create invoker.
* 建立一個實體域
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy,Class<T> type,URL url) throws RpcException;
}
複製程式碼
該介面是代理工廠介面,它也是個可擴充套件介面,預設實現javassist,dubbo提供兩種動態代理方法分別是javassist/jdk,該介面定義了三個方法,前兩個方法是通過invoker建立代理,最後一個是通過代理來獲得invoker。
(十)RpcContext
該類就是遠端呼叫的上下文,貫穿著整個呼叫,例如A呼叫B,然後B呼叫C。在服務B上,RpcContext在B之前將呼叫資訊從A儲存到B。開始呼叫C,並在B呼叫C後將呼叫資訊從B儲存到C。RpcContext儲存了呼叫資訊。
public class RpcContext {
/**
* use internal thread local to improve performance
* 本地上下文
*/
private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
/**
* 服務上下文
*/
private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
/**
* 附加值集合
*/
private final Map<String,String> attachments = new HashMap<String,String>();
/**
* 上下文值
*/
private final Map<String,Object> values = new HashMap<String,Object>();
/**
* 執行緒結果
*/
private Future<?> future;
/**
* url集合
*/
private List<URL> urls;
/**
* 當前的url
*/
private URL url;
/**
* 方法名稱
*/
private String methodName;
/**
* 引數型別集合
*/
private Class<?>[] parameterTypes;
/**
* 引數集合
*/
private Object[] arguments;
/**
* 本地地址
*/
private InetSocketAddress localAddress;
/**
* 遠端地址
*/
private InetSocketAddress remoteAddress;
/**
* 實體域集合
*/
@Deprecated
private List<Invoker<?>> invokers;
/**
* 實體域
*/
@Deprecated
private Invoker<?> invoker;
/**
* 會話域
*/
@Deprecated
private Invocation invocation;
// now we don't use the 'values' map to hold these objects
// we want these objects to be as generic as possible
/**
* 請求
*/
private Object request;
/**
* 響應
*/
private Object response;
複製程式碼
該類中最重要的是它的一些屬性,因為該上下文就是用來儲存資訊的。方法我就不介紹了,因為比較簡單。
(十一)RpcException
/**
* 不知道異常
*/
public static final int UNKNOWN_EXCEPTION = 0;
/**
* 網路異常
*/
public static final int NETWORK_EXCEPTION = 1;
/**
* 超時異常
*/
public static final int TIMEOUT_EXCEPTION = 2;
/**
* 基礎異常
*/
public static final int BIZ_EXCEPTION = 3;
/**
* 禁止訪問異常
*/
public static final int FORBIDDEN_EXCEPTION = 4;
/**
* 序列化異常
*/
public static final int SERIALIZATION_EXCEPTION = 5;
複製程式碼
該類是rpc呼叫丟擲的異常類,其中封裝了五種通用的錯誤碼。
(十二)RpcInvocation
/**
* 方法名稱
*/
private String methodName;
/**
* 引數型別集合
*/
private Class<?>[] parameterTypes;
/**
* 引數集合
*/
private Object[] arguments;
/**
* 附加值
*/
private Map<String,String> attachments;
/**
* 實體域
*/
private transient Invoker<?> invoker;
複製程式碼
該類實現了Invocation介面,是rpc的會話域,其中的方法比較簡單,主要是封裝了上述的屬性。
(十三)RpcResult
/**
* 結果
*/
private Object result;
/**
* 異常
*/
private Throwable exception;
/**
* 附加值
*/
private Map<String,String>();
複製程式碼
該類實現了Result介面,是rpc的結果實現類,其中關鍵是封裝了以上三個屬性。
(十四)RpcStatus
該類是rpc的一些狀態監控,其中封裝了許多的計數器,用來記錄rpc呼叫的狀態。
1.屬性
/**
* uri對應的狀態集合,key為uri,value為RpcStatus物件
*/
private static final ConcurrentMap<String,RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String,RpcStatus>();
/**
* method對應的狀態集合,key是uri,第二個key是方法名methodName
*/
private static final ConcurrentMap<String,ConcurrentMap<String,RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String,RpcStatus>>();
/**
* 已經沒用了
*/
private final ConcurrentMap<String,Object> values = new ConcurrentHashMap<String,Object>();
/**
* 活躍狀態
*/
private final AtomicInteger active = new AtomicInteger();
/**
* 總的數量
*/
private final AtomicLong total = new AtomicLong();
/**
* 失敗的個數
*/
private final AtomicInteger failed = new AtomicInteger();
/**
* 總呼叫時長
*/
private final AtomicLong totalElapsed = new AtomicLong();
/**
* 總呼叫失敗時長
*/
private final AtomicLong failedElapsed = new AtomicLong();
/**
* 最大呼叫時長
*/
private final AtomicLong maxElapsed = new AtomicLong();
/**
* 最大呼叫失敗時長
*/
private final AtomicLong failedMaxElapsed = new AtomicLong();
/**
* 最大呼叫成功時長
*/
private final AtomicLong succeededMaxElapsed = new AtomicLong();
/**
* Semaphore used to control concurrency limit set by `executes`
* 訊號量用來控制`execution`設定的併發限制
*/
private volatile Semaphore executesLimit;
/**
* 用來控制`execution`設定的許可證
*/
private volatile int executesPermits;
複製程式碼
以上是該類的屬性,可以看到儲存了很多的計數器,分別用來記錄了失敗呼叫成功呼叫等累計數。
2.beginCount
/**
* 開始計數
* @param url
*/
public static void beginCount(URL url,String methodName) {
// 對該url對應對活躍計數器加一
beginCount(getStatus(url));
// 對該方法對活躍計數器加一
beginCount(getStatus(url,methodName));
}
/**
* 以原子方式加1
* @param status
*/
private static void beginCount(RpcStatus status) {
status.active.incrementAndGet();
}
複製程式碼
該方法是增加計數。
3.endCount
public static void endCount(URL url,String methodName,long elapsed,boolean succeeded) {
// url對應的狀態中計數器減一
endCount(getStatus(url),elapsed,succeeded);
// 方法對應的狀態中計數器減一
endCount(getStatus(url,methodName),succeeded);
}
private static void endCount(RpcStatus status,boolean succeeded) {
// 活躍計數器減一
status.active.decrementAndGet();
// 總計數器加1
status.total.incrementAndGet();
// 總呼叫時長加上呼叫時長
status.totalElapsed.addAndGet(elapsed);
// 如果最大呼叫時長小於elapsed,則設定最大呼叫時長
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
// 如果rpc呼叫成功
if (succeeded) {
// 如果成最大呼叫成功時長小於elapsed,則設定最大呼叫成功時長
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
// 失敗計數器加一
status.failed.incrementAndGet();
// 失敗的過期數加上elapsed
status.failedElapsed.addAndGet(elapsed);
// 總呼叫失敗時長小於elapsed,則設定總呼叫失敗時長
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
複製程式碼
該方法是計數器減少。
(十五)StaticContext
該類是系統上下文,僅供內部使用。
/**
* 系統名稱
*/
private static final String SYSTEMNAME = "system";
/**
* 系統上下文集合,僅供內部使用
*/
private static final ConcurrentMap<String,StaticContext> context_map = new ConcurrentHashMap<String,StaticContext>();
/**
* 系統上下文名稱
*/
private String name;
複製程式碼
上面是該類的屬性,它還記錄了所有的系統上下文集合。
(十六)EchoService
public interface EchoService {
/**
* echo test.
* 回聲測試
* @param message message.
* @return message.
*/
Object $echo(Object message);
}
複製程式碼
該介面是回聲服務介面,定義了一個一個回聲測試的方法,回聲測試用於檢測服務是否可用,回聲測試按照正常請求流程執行,能夠測試整個呼叫是否通暢,可用於監控,所有服務自動實現該介面,只需將任意服務強制轉化為EchoService,就可以用了。
(十七)GenericException
該方法是通用的異常類。
/**
* 異常類名
*/
private String exceptionClass;
/**
* 異常資訊
*/
private String exceptionMessage;
複製程式碼
比較簡單,就封裝了兩個屬性。
(十八)GenericService
public interface GenericService {
/**
* Generic invocation
* 通用的會話域
* @param method Method name,e.g. findPerson. If there are overridden methods,parameter info is
* required,e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method,String[] parameterTypes,Object[] args) throws GenericException;
}
複製程式碼
該介面是通用的服務介面,同樣定義了一個類似invoke的方法
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了遠端呼叫的開篇,介紹之後解讀遠端呼叫模組的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的原始碼解析,其中的邏輯不負責,要關注的是其中的一些概念和dubbo如何去做暴露服務和引用服務,其中很多的介面定義需要弄清楚。接下來我將開始對rpc模組的過濾器進行講解。