1. 程式人生 > >dubbo花錢買的知識精髓

dubbo花錢買的知識精髓

Dubbo

初步認識dubbo及基本應用

        官方網址:http://dubbo.apache.org
        由於業務的複雜度公司服務不斷增多,那麼遠端服務之 間的呼叫才是實現分散式的關鍵因素。
服務與服務之間的呼叫無非 就是跨程序通訊而已,我們可以使用 socket 來實現通訊,我們也可以使用 nio來實現高效能通訊。我們不用這些開源的RPC框架,也可以完成通訊的過程。
官方圖解:

圖解說明:

節點 角色說明
Provider 暴露服務的服務提供方
Consumer 呼叫遠端服務的服務消費方
Registry 服務註冊與發現的註冊中心
Monitor 統計服務的呼叫次數和呼叫時間的監控中心
Container 服務執行容器

但是為什麼要用現成的框架呢?

1. 底層網路通訊協議的處理 ;
2. 序列化和反序列化的處理工作 ;
3. 網路請求之間的負載均衡、容錯機制、服務降級機制等等要一一處理太麻煩;

大規模服務化對於服務治理的要求 ?

我認為到目前為止,還只是滿足了通訊的基礎需求,但是當企業開始大規模 的服務化以後,遠端通訊帶來的弊端就越來越明顯了。比如說
            1. 服務鏈路變長了,如何實現對服務鏈路的跟蹤和監控呢?
            2. 服務的大規模叢集使得服務之間需要依賴第三方註冊中心來解決服務的 發現和服務的感知問題
            3. 服務通訊之間的異常,需要有一種保護機制防止一個節點故障引發大規模 的系統故障,所以要有容錯機制
            4. 服務大規模叢集會是的客戶端需要引入負載均衡機制實現請求分發 
        而這些對於服務治理的要求,傳統的 RPC 技術在這樣的場景中顯得有點力 不從心,因此很多企業開始研發自己的RPC框架,比如阿里的HSF、Dubbo; 京東的JSF框架、噹噹的dubbox、新浪的motan、螞蟻金服的sofa等等 又技術輸出能力的公司,都會研發適合自己場景的rpc框架,要麼是從0到 1 開發,要麼是基於現有的思想結合公司業務特色進行改造。而沒有技術輸 出能力的公司,遇到服務治理的需求時,會優先選擇那些比較成熟的開源框架。而Dubbo就是其中一個 。

分析Dubbo服務治理技術

基於註冊中心的dubbo服務

        作為主流的服務治理元件,Dubbo 提供了很多豐富的功能,那麼最根本的就是要解決大規模叢集之後的服務註冊和發現的問題。而 dubbo中對於注 冊中心這塊是使用zookeeper來支撐的。當然在目前最新的版本中。 Dubbo能夠支援的註冊中心有:consul、etcd、nacos、sofa、zookeeper、 redis、multicast 。

使用zookeeper作為註冊中心的jar包依賴
<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-framework</artifactId> 
    <version>4.0.0</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-recipes</artifactId> 
    <version>4.0.0</version> 
</dependency>

dubbo整合zookeeper的實現原理圖

provider將自己的介面服務註冊在zookeeper節點中,節點上包含了provider的url、介面、以及一些其他的配置資訊(協議,負載均衡機制,容錯機制,序列化等),當consumer要請求介面時,通過介面路徑在zookeeper中拿到url和配置資訊,根據配置資訊訪問介面拿到資料。
dubbo 每次都要連 zookeeper拿資料? 
        並不需要,dubbo會拿到zookeeper上資料快取在本地,只要zookeeper上的節點資料不發生改變,dubbo會優先讀取本地快取資料,所以當啟動一段時間即使zookeeper服務掛了也不影響dubbo的使用。

負載均衡機制

        dubbo整合zookeeper解決了服務註冊以及服務動態感知的問題。那麼 當服務端存在多個節點的叢集時,zookeeper 上會維護不同叢集節點,對於客戶端而 言,他需要一種負載均衡機制來實現目標服務的請求負載。
在服務端配置:
        1.配置檔案配置—<dubbo:service interface="..." loadbalance="roundrobin" />
        2.註解配置—@Service(loadbalance = "roundrobin") 
                           public class HelloServiceImpl implements IHelloService{ 
在客戶端配置(客戶端配置優先順序大於服務端):
        1.配置檔案配置—<dubbo:reference interface="..." loadbalance="roundrobin" />
        2.註解配置—@Reference(loadbalance = "random") 
                             private IHelloService helloService;

在java客戶端中用下面四種英文表示 roundrobin/random/ leastactive/ consistenthash ,分別對應一下四種
RoundRobinLoadBalance:加權輪詢算,按公約後的權重設定輪循比率。所謂輪詢是指將請求輪流分配給每臺伺服器。舉個例子,我們有三臺伺服器 A、 B、C。 我們將第一個請求分配給伺服器 A,第二個請求分配給伺服器 B,第三個請求分配給 伺服器 C,第四個請求再次分配給伺服器 A。這個過程就叫做輪詢。輪詢是一種無狀 態負載均衡演算法,實現簡單,適用於每臺伺服器效能相近的場景下。但現實情況下, 我們並不能保證每臺伺服器效能均相近。如果我們將等量的請求分配給效能較差的服 務器,這顯然是不合理的。因此,這個時候我們需要對輪詢過程進行加權,以調控每 臺伺服器的負載。經過加權後,每臺伺服器能夠得到的請求數比例,接近或等於他們 的權重比。比如伺服器 A、B、C 權重比為 5:2:1。那麼在 8 次請求中,伺服器 A 將 收到其中的 5 次請求,伺服器 B 會收到其中的 2 次請求,伺服器 C 則收到其中的 1次請求。
RandomLoadBalance:隨機權重演算法,根據配置的的weight屬性,佔總的weights的百分比進行按比重分配。假設我們有一組伺服器 servers = [A, B, C],他們對應的權重為 weights = [5, 3, 2],權重總和為10。隨機一個[0,10)的數字,比如說8,把8減去A,如果大於等於0(結果3>0),拿結果3-B還是大於等於0,以此類推,直到相減結果小於0,得到此時的伺服器,也就是C。

LeastActiveLoadBalance:最少活躍數演算法,每個服務維護一個活躍數計數器。當A機器開始處理請求,該計數器加1,此時A還未處理完成。若處理完畢則計數器減1。而B機器接受到請求後很快處理完畢。那麼A,B的活躍數分別是1,0。當又產生了一個新的請求,則選擇B機器去執行(B活躍數最小),這樣使慢的機器A收到少的請求。因此活躍數下降的也越快,此時這樣的服 務提供者能夠優先獲取到新的服務請求。

ConsistentHashLoadBalance:hash一致性演算法,能使相同引數資料傳送到同一臺機器上。(預設只對第一個引數 Hash,預設160份虛擬節點)當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引起劇烈變動+。假如有N個真實節點,把每個真實節點對映成M個虛擬節點,再把 M*N 個虛擬節點, 雜湊在圓環上. 各真實節點對應的虛擬節點相互交錯分佈這樣,某真實節點down後,則把其影響平均分擔到其他所有節點上。例如:也就是a,b,c,d的虛擬節點a0,a1,a2,b0,b1,b2,c0,c1,c2,d0,d1,d2散落在圓環上,假設C號節點down,則c0,c1,c2的壓力分別傳給d0,a1,b1,如下圖:

叢集容錯機制

在java中配置方式:@Service(loadbalance = "random", cluster = "failsafe") 
Faiover Cluster:預設的容錯機制,當client叢集呼叫失敗時,重試其他的伺服器。可通過retries=2來設定重試次數。
Failfast Cluster:快速失敗,只發起一次呼叫,失敗立即報錯,用於非冪等性的寫操作。
Failsafe Cluster:失敗安全,出現異常時,直接忽略,通常用了寫入日誌操作。
Failback Cluster:失敗自動恢復,後臺記錄失敗請求,定時重發。通常用於訊息通知。
Forking Cluster:並行呼叫多個伺服器,只要一個返回成功即返回。
Broadcast Cluster:廣播呼叫所有提供者,依次呼叫,任意一條報錯則寶座

服務降級

        當某個非關鍵服務出現錯誤時,可以通過降級功能來臨時遮蔽這個服務。
實現如下:在dubbo-client端建立一個mock類,當出現服務降級時,會被呼叫 

@Reference( loadbalance = "random", mock = "自己實現的mock類", timeout =1000, cluster = "failfast") 
IHelloService helloService;

動態配置以及規則

        動態配置是Dubbo2.7版本引入的一個新的功能,簡單來說,就是把dubbo.properties 中的屬性進行集中式儲存,儲存在其他的伺服器上。 那麼如果需要用到集中式儲存,那麼還需要一些配置中心的元件來支撐。 目前Dubbo能支援的配置中心有:apollo、nacos、zookeeper 。外部配置的優先順序大於本地配置。 
原理:預設所有的配置都儲存在/dubbo/config節點,具體節點結構圖如下namespace,用於不同配置的環境隔離。 config,Dubbo 約定的固定節點,不可更改,所有配置和服務治理規則都儲存在此節點下。 
dubbo/application:分別用來隔離全域性配置;
dubbo:應用級別配置:dubbo是預設group值;
application:對應應用名 dubbo.properties,此節點的node value儲存具體配置內容 ;

元資料中心

dubbo2.7所有的資訊(介面名稱、url、版本、負載均衡、容錯策略等等),server配置引數有30多個,client配置有25個以上。全部在zookeeper上會造成以下問題:
1.url內容過多,導致資料儲存空間增大;
2.url需要涉及到網路傳輸,資料量過大會造成網路傳輸慢;
3.網路傳輸慢造成服務地址感知延遲變大,影響服務正常響應;
元資料中心目前支援redis和zookeeper。官方推薦是採用redis。畢竟redis本身對於 非結構化儲存的資料讀寫效能比較高。當然,也可以使用zookeeper來實現。 在配置檔案中新增元資料中心的地址 
dubbo.metadata-report.address=zookeeper://192.168.13.106:2181
dubbo.registry.simplified=true  //註冊到註冊中心的URL是否採用精簡模式的 

分析Dubbo原始碼之核心—SPI

Dubbo SPI(面試dubbo必問)

java spi的實現

我們如何去實現一個標準的 SPI 發現機制呢?其實很簡單,只需要滿足以下提交就行了 
1. 需要在 classpath 下建立一個目錄,該目錄命名必須是:META-INF/service 
2. 在該目錄下建立一個 properties 檔案,該檔案需要滿足以下幾個條件 
        2.1 檔名必須是擴充套件的介面的全路徑名稱; 
        2.2 檔案內部描述的是該擴充套件介面的所有實現類;  
        2.3 檔案的編碼格式是 UTF-8; 
3. 通過 java.util.ServiceLoader 的載入機制來發現 
        讀取META-INF/services/下的配置檔案,獲得所有能被例項化的類的名稱,值得注意的是,ServiceLoader可以跨越jar包獲取META-INF下的配置檔案。
        
通過反射方法Class.forName()載入類物件,並用instance()方法將類例項化。
        把例項化後的類快取到providers物件中,(LinkedHashMap<String,S>型別)然後返回例項物件。

java在jdk載入java.sql.Driver的介面實現類的時候就會得到mysql的Driver,然後根據DriverManager.getConnection方法判斷得到正確的資料庫連線。
java SPI 的缺點 :
1. JDK 標準的 SPI 會一次性載入例項化擴充套件點的所有實現—就是如果你在 META-INF/service 下的檔案裡面加了 N 個實現類,那麼 JDK 啟動的時候都會一次性全部載入。那麼如果有的擴充套件點實現初始化很耗時或者如果有些實現類並沒有用到, 那麼會很浪費資源。
2. 如果擴充套件點載入失敗,會導致呼叫方報錯,而且這個錯誤很難定位到是這個原因。 
3.java中是serverLoad上下文載入器載入的,而dubbo中是ExtensionLoader擴充套件載入器載入的。

dubbo SPI

實現:
1. 需要在 resource 目錄下配置 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services,並基於 SPI 介面去 建立一個檔案 ,能夠被擴充套件的介面必須要有@SPI("value")註解。value表示當前擴充套件點的預設實現。
2. 檔名稱和介面名稱保持一致,檔案內容和 SPI 有差異,內容是 KEY 對應 Value ,key隨便定義,value名必須是檔名介面自定義實現類。
Dubbo 針對的擴充套件點非常多,可以針對協議、攔截、叢集、路由、負載均衡、序列化、容器… 幾乎裡面用到的所有功能,都可 以實現自己的擴充套件,我覺得這個是 dubbo 比較強大的一點。 
例如:
1. 建立如下結構,新增 META-INF.dubbo 檔案。類名和 Dubbo 提供的協議擴充套件點介面保持一致:
 
2.建立 MyProtocol 協議類可以實現自己的協議,我們為了模擬協議產生了作用,修改一個埠 :

3.呼叫載入:

Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");  
System.out.print(protocol.getDefaultPort) ;//可以看到是輸出自己的協議埠。

 靜態擴充套件點原始碼實現:

//類得到擴充套件點載入器
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type (" + type +
                ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }
    //去快取ConcurrentMap中找這個型別載入器
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));//如果沒有new一個放進快取並且返回
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}
//建立擴充套件類例項
@SuppressWarnings("unchecked")
public T getExtension(String name) {
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    if ("true".equals(name)) {
        return getDefaultExtension();//獲取預設的擴充套件類例項
    }
    Holder<Object> holder = getOrCreateHolder(name);//快取取值
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name);//建立一個name=key 對應的例項
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}
//建立擴充套件類例項繼續
@SuppressWarnings("unchecked")
private T createExtension(String name) {
//重點在這裡,找指定目錄下 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services反射找到所有類class根據name得到class。
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz);快取中取
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());//放快取
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        //依賴注入:向拓展物件中注入依賴,它會獲取類的所有方法。判斷方法是否以 set 開頭,且方法僅有一個引數,且方法訪問級別為 public,就通過反射設定屬性值。所以說,Dubbo中的IOC僅支援以setter方式注入。
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}

 自適應擴充套件點原始碼實現:

        在Dubbo中,很多拓展都是通過 SPI 機制進行載入的,比如 Protocol、Cluster、LoadBalance 等。這些擴充套件並非在框架啟動階段就被載入(jdk是啟動就載入所有擴充套件),而是在擴充套件方法被呼叫的時候,根據URL物件引數進行載入。那麼,Dubbo就是通過自適應擴充套件機制來解決這個問題——首先 Dubbo 會為拓展介面生成具有代理功能的程式碼。然後通過 javassist 或 jdk 編譯這段程式碼,得到 Class 類。最後再通過反射建立代理類,在代理類中,就可以通過URL物件的引數來確定到底呼叫哪個實現類。自適應註解:@Adaptive
@Adaptive寫在類上:當前類是一個確定的自適應擴充套件點的類,Dubbo 不會為該類生成代理類;
@Adaptive寫在方法上:Dubbo 則會為該方法生成代理邏輯,表示當前方法需要根據 引數URL 呼叫對應的擴充套件點實現;

//原始碼實現
public T getAdaptiveExtension() {
    // 從快取中獲取自適應拓展
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                //未命中快取,則建立自適應拓展,然後放入快取
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create 
                                                  adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
    }
    return (T) instance;
}
//快取中沒有就建立
private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("
            Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

private Class<?> getAdaptiveExtensionClass() {
    //獲取當前介面的所有實現類
    //如果某個實現類標註了@Adaptive,此時cachedAdaptiveClass不為空
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    //以上條件不成立,就建立自適應拓展類
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
//@Adaptive註解在方法上,需要代理一個類,預設用javassist。介面方法上有@Adaptive會有代理實現,沒有的則丟擲異常
private Class<?> createAdaptiveExtensionClass() {
    //構建自適應拓展程式碼
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    // 獲取編譯器實現類 這個Dubbo預設是採用javassist 
    Compiler compiler =ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
    //編譯程式碼,返回類例項的物件
    return compiler.compile(code, classLoader);
}

實現例子:

//我們還是以Protocol介面為例,它的export()和refer()方法,都標註為@Adaptive。destroy和 getDefaultPort未標註 @Adaptive註解。Dubbo 不會為沒有標註 Adaptive 註解的方法生成代理邏輯,對於該種類型的方法,僅會生成一句丟擲異常的程式碼。
@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    void destroy();
}

//所以說當我們呼叫這兩個方法的時候,會先拿到URL物件中的協議名稱,再根據名稱找到具體的擴充套件點實現類,然後去呼叫。下面是生成自適應擴充套件類例項的原始碼:
public class Protocol$Adaptive implements Protocol {
    public void destroy() {
        throw new UnsupportedOperationException(
                "method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");
    }
    public int getDefaultPort() {
        throw new UnsupportedOperationException(
                "method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");
    }
    public Exporter export(Invoker invoker)throws RpcException {
        if (invoker == null) {
            throw new IllegalArgumentException("Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
            throw new IllegalArgumentException("Invoker argument getUrl() == null");
        }
            
        URL url = invoker.getUrl();
       //預設dubbo協議 
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException("Fail to get extension(Protocol) name from url("
                    + url.toString() + ") use keys([protocol])");
        }
        //預設dubbo協議,也可以自定義協議    
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.export(invoker);
    }
    public Invoker refer(Class clazz,URL ur)throws RpcException {
        if (ur == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL url = ur;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");
        }
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.refer(clazz, url);
    }
}

分析Dubbo原始碼之服務消費

服務釋出(三步:讀取配置檔案—服務註冊到zookeeper—啟動netty服務監聽)

1.配置檔案解析或註釋解析

在 spring 中定義了兩個介面 
NamespaceHandler: 註冊一堆 BeanDefinitionParser,利用他們來進行解析 
BeanDefinitionParser:用於解析每個 element 的內容 
        Spring 預設會載入 jar 包下的 META-INF/spring.handlers 檔案尋找對應的 NamespaceHandler。 Dubbo-config 模組下的 dubboconfig-spring。Dubbo 中 spring 擴充套件就是使用 spring 的自定義型別,所以同樣也有 NamespaceHandler、BeanDefinitionParser。而 NamespaceHandler 是 DubboNamespaceHandler  , 

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }
    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我們想看 dubbo:service 的配置,就直接看 DubboBeanDefinitionParser(ServiceBean.class,true) 這個裡面主要做了一件事,把不同的配置分別轉化成 spring 容器中的 bean 物件 :
                application 對應 ApplicationConfig 
                registry 對應 RegistryConfig 
                monitor 對應 MonitorConfig 
                provider 對應 ProviderConfig 
                consumer 對應 ConsumerConfig

        涉及到服務釋出和服務呼叫的兩個配置的解析,用的是 ServiceBean 和 referenceBean。並不是 config 結尾 的,這兩個類稍微特殊些,當然他同時也繼承了 ServiceConfig 和 ReferenceConfig                     registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
                    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); 

DubboBeanDefinitionParser 
        這裡面是實現具體配置檔案解析的入口,它重寫了 parse 方法,對 spring 的配置進行解析。我們關注一下 ServiceBean 的解析. 實際就是解析 dubbo:service 這個標籤中對應的屬性 。

else if (ServiceBean.class.equals(beanClass)) {
    String className = element.getAttribute("class");
    if (className != null && className.length() > 0) {
        RootBeanDefinition classDefinition = new RootBeanDefinition();
        classDefinition.setBeanClass(ReflectUtils.forName(className));
        classDefinition.setLazyInit(false);
        parseProperties(element.getChildNodes(), classDefinition);
        beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
    }
}

ServiceBean
ServiceBean 這個類,分別實現了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware, ApplicationEventPublisherAware :
InitializingBean:介面為 bean 提供了初始化方法的方式,它只包括 afterPropertiesSet 方法,凡是繼承該介面的類,在初始化 bean 的時候會執行 該方法。被重寫的方法為 afterPropertiesSet。
DisposableBean:被重寫的方法為 destroy,bean 被銷燬的時候,spring 容器會自動執行 destory 方法,比如釋放資源。
ApplicationContextAware:實現了這個介面的 bean,當 spring 容器初始化的時候,會自動的將 ApplicationContext 注入進來 。
ApplicationListener:ApplicationEvent 事件監聽,spring 容器啟動後會發一個事件通知。被重寫的方法為:onApplicationEvent,onApplicationEvent 方法傳入的物件是 ContextRefreshedEvent。這個物件是當 Spring 的上下文被重新整理或者載入完畢的時候觸發的。因此服務就是在 Spring 的上下文重新整理後進行匯出操作的。
BeanNameAware:獲得自身初始化時,本身的 bean 的 id 屬性,被重寫的方法為 setBeanName 。
ApplicationEventPublisherAware:這個是一個非同步事件傳送器。被重寫的方法為 setApplicationEventPublisher,簡單來說,在 spring 裡面提供了類似於訊息佇列 的非同步事件解耦功能。(典型的觀察者模式的應用)。 

2.服務註冊

在 ServiceBean 中,我們暫且只需要關注兩個方法,分別是: 
                在初始化 bean 的時候會執行該方法 afterPropertiesSet——把 dubbo 中配置的 application、registry、service、protocol 等資訊,載入到對應的 config 實體中,便於後續的使用;
                spring 容器啟動後會發一個事件通知 onApplicationEvent——用呼叫 export 進行服務釋出的流程(這裡就是入口) ;

export 方法

serviceBean 中,重寫了 export 方法,實現了 一個事件的釋出。並且呼叫了 super.export() ,也就是會呼叫父類ServiceConfig 的 export 方法 。

public synchronized void export() {
    checkAndUpdateSubConfigs(); // 檢查並且更新配置資訊
    if (!shouldExport()) {// 當前的服務是否需要釋出 , 通過配置實現: @Service(export = false) 
        return;
    }
    if (shouldDelay()) {// 檢查是否需要延時釋出,通過配置 @Service(delay = 1000) 實現,單位毫秒
        delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); //定時器延時
    } else {
        doExport();
    }
}

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    if (exported) {//服務已經註冊
        return;
    }
    exported = true;//設定已經註冊狀態
    if (StringUtils.isEmpty(path)) {//path表示服務路徑,預設使用 interfaceName 
        path = interfaceName;
    }
    doExportUrls();
}
/**1. 記載所有配置的註冊中心地址 
  *2. 遍歷所有配置的協議,protocols 
  *3. 針對每種協議釋出一個對應協議的服務  
 */
private void doExportUrls() {
   // 載入所有配置的註冊中心的地址,組裝成一個 URL (registry://ip:port/org.apache.dubbo.registry.RegistryService的東西 ) 
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
            //group跟 version組成一個 pathKey(serviceName) 
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
           //applicationModel用來儲存 ProviderModel ,釋出的服務的元資料,後續會用到
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

doExportUrlsFor1Protocol 
        釋出指定協議的服務,我們以 Dubbo 服務為例
                1. 前面的一大串 if else 程式碼,是為了把當前服務下所配置的<dubbo:method>引數進行解析,儲存到 map 集合中 
                2. 獲得當前服務需要暴露的 ip 和埠 
                3. 把解析到的所有資料,組裝成一個 URL,大概應該是: dubbo://192.168.13.1:20881/com.yaoxiaojian.dubbo.service.ISayHelloService 
 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {     
         ...
         ...
      // 獲得當前服務要釋出的目標 ip 和 port   
         String host = this.findConfigedHosts(protocolConfig, registryURLs, map);    
        Integer port = this.findConfigedPorts(protocolConfig, name, map);       
         // 組裝 URL     
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); 
        // 這裡是通過 ConfiguratorFactory 去實現動態改變配置的功能,這裡暫時不涉及後續再分析
         if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {             
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url); 
            } 
          //上面程式碼均是對url的解析和組裝。
         //如果 scope!="none"則釋出服務,預設 scope 為 null。如果 scope 不為 none,判斷是否為 local 或 remote,從而釋出 Local 服務 或 Remote 服務,預設兩個都會發布  
        String scope = url.getParameter(SCOPE_KEY); 
        if(!SCOPE_NONE.equalsIgnoreCase(scope)) {  
               //injvm 釋出到本地
             if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                 exportLocal(url);             
             }             
              // 釋出遠端服務
             if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { 
                      ....
                      ....
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                //protocol是自適應擴充套件點,wrapperInvoker是registry://  所以這邊呼叫得是 registryProtocol.export這個方法
                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
              }

RegistryProtocol.export 方法

很明顯,這個 RegistryProtocol 是用來實現服務註冊的,這裡面會有很多處理邏輯: 
1.實現對應協議的服務釋出 
2.實現服務註冊 
3.訂閱服務重寫 

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
   //這裡獲得的是 zookeeper註冊中心的 url: zookeeper://ip:port
    URL registryUrl = getRegistryUrl(originInvoker);
    //  這裡是獲得服務提供者的 url, dubbo://ip:port...         
    URL providerUrl = getProviderUrl(originInvoker);
    // 訂閱 override資料。在 admin控制檯可以針對服務進行治理,比如修改權重,修改路由機制等,當註冊中心有此服務的覆蓋配置 註冊進來時,推送訊息給提供者,重新暴露服務
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //************************** // 這裡就交給了具體的協議去暴露服務(很重要),啟動一個netty服務
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    //*********把dubbo註冊到zookeeper上,建立一個zookeeperclient建立dubbo://ip:port/com...這樣的節點
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register", true);
    if (register) {
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

doLocalExport 
先通過 doLocalExport 來暴露一個服務,本質上應該是啟動一個通訊服務,主要的步驟是將本地 ip 和 20880 埠開啟,進行監聽 :
originInvoker: 應該是 registry://ip:port/com.alibaba.dubbo.registry.RegistryService 
key: 從 originInvoker 中獲得釋出協議的 url: dubbo://ip:port/... 
bounds: 一個 prviderUrl 服務 export 之後,快取到 bounds 中,所以一個 providerUrl 只會對應一個 exporter 

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        // 對原有的 invoker, 委託給了 InvokerDelegate          
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        // 將 invoker轉換為 exporter並啟動 netty服務,自適應拓展點 呼叫的是DubboProtocol.export?其實是在自適應擴充套件點中包裝成了QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol)))),為了對原有的invoker進行增強,比如filter——在實現遠端呼叫的時候,會經過這些 filter 進行過濾。 
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

DubboProtocol.export 
基於動態代理的適配,很自然的就過渡到了 DubboProtocol 這個協議類中,但是實際上是 DubboProtocol 嗎? 
這裡並不是獲得一個單純的 DubboProtocol 擴充套件點,而是會通過 Wrapper 對 Protocol 進行裝飾,裝飾器分別為: QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol 為什麼是這樣?我們再來看看 spi 的程式碼 

Wrapper 包裝 
在 ExtensionLoader.loadClass 這個方法中,有一段這樣的判斷,如果當前這個類是一個 wrapper 包裝類,也就是這個 wrapper 中有構造方法,引數是當前被載入的擴充套件點的型別,則把這個 wrapper 類加入到 cacheWrapperClass 快取中。 

else if (isWrapperClass(clazz)) {
    cacheWrapperClass(clazz);
}

我們可以在 dubbo 的配置檔案中找到三個 Wrapper :
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper  
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper:對 invoker 進行 filter 的包裝,實現請求的過濾 ;
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper 
接著,在 呼叫getExtension->createExtension 方法中,會對 cacheWrapperClass 集合進行判斷,如果集合不為空,則進行包裝: 

Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
    for (Class<?> wrapperClass : wrapperClasses) {
        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
    }
}

例如:ProtocolFilterWrapper 這個是一個過濾器的包裝,使用責任鏈模式,對 invoker 進行了包裝 
 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {         
         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {             
                    return protocol.export(invoker);         
           }         
       return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));    
}

包裝成QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol))))過濾完之後開始執行DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // 獲取服務標識,理解成服務座標也行。由服務組名,服務名,服務版本號以及埠組成。比如
    //${group}/copm.gupaoedu.practice.dubbo.ISayHelloService:${version}:20880   
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
  //開啟服務  暴露20880埠
    openServer(url);
  //優化序列化
    optimizeSerialization(url);
    return exporter;
}

private void openServer(URL url) {
    //獲取 host:port ,並將其作為伺服器例項的 key ,用於標識當前的伺服器例項
    String key = url.getAddress();
    //client 也可以暴露一個只有 server可以呼叫的服務
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
         // 是否在 serverMap中快取了
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                     // 建立伺服器例項
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            //伺服器已建立,則根據 url 中的配置重置伺服器
            server.reset(url);
        }
    }
}

private ExchangeServer createServer(URL url) {
      // 組裝 url ,在 url中新增心跳時間、編解碼引數
    url = URLBuilder.from(url)
           // 當服務關閉以後,傳送一個只讀的事件,預設是開啟狀態
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            //啟動心跳配置
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
   // 通過 SPI 檢測是否存在 server 引數所代表的 Transporter 拓展,不存在則丟擲異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return server;
}

後續程式碼就不一一寫了,寫個鏈流程如下:
Exchangers.bind(url, requestHandler);——>headerExchanger.bind——>Transporters.bind——>getTransporter().bind(url, handler)——>NettyTransporter.bind(URL url, ChannelHandler listener)——> new NettyServer(url, listener)——>NettyServer.doOpen();之後就是netty的監聽服務工作了。

3.啟動netty服務實現遠端監聽然後把dubbo註冊到zookeeper上,建立一個zookeeperclient建立dubbo://ip:port/com...這樣的節點

服務消費 

時序圖:

1. 生成遠端服務的代理

服務入口ReferenceConfig.get() 獲得一個遠端代理類;
createProxy 

private T createProxy(Map<String, String> map) {
    if (shouldJvmRefer(map)) {//判斷是否是在同一個jvm程序中呼叫 
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        urls.clear(); // reference retry init will add url to urls, lead to OOM
          //url 如果不為空,說明是點對點通訊
        if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    // 檢測 url 協議是否為 registry,若是,表明使用者想使用指定的註冊 中心 
                    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 將 map 轉換為查詢字串,並作為 refer 引數的值新增到 url 中
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合併 url,移除服務提供者的一些配置(這些配置來源於使用者配置 的 url 屬性),                       
                        // 比如執行緒池相關配置。並保留服務提供者的部分配置,比如版本, group,時間戳等                       
                        // 後將合併後的配置設定為 url 查詢字串中。 
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // assemble URL from register center's configuration
            // if protocols not injvm checkRegistry
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                checkRegistry();
                 //校驗註冊中心的配置以及是否有必要從配置中心組裝url               
                 //這裡的程式碼實現和服務端類似,也是根據註冊中心配置進行解析得到URL     
                  //這裡的URL肯定也是: registry://ip:port/org.apache.dubbo.service.RegsitryService 
                List<URL> us = loadRegistries(false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                 //如果沒有配置註冊中心,則報錯 
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
        }
       //如果值配置了一個註冊中心或者一個服務提供者,直接使用refprotocol.refer 
        if (urls.size() == 1) {
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use RegistryAwareCluster only when register's CLUSTER is available
                URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else { // not a registry url, must be direct invoke.
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }

    if (shouldCheck() && !invoker.isAvailable()) {
        throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    /**
     * @since 2.7.0
     * ServiceData Store
     */
    MetadataReportService metadataReportService = null;
    if ((metadataReportService = getMetadataReportService()) != null) {
        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        metadataReportService.publishConsumer(consumerURL);
    }
    // create service proxy
    return (T) PROXY_FACTORY.getProxy(invoker);
}

2. 獲得目標服務的url地址

RegistryProtocol.refer 
這裡面的程式碼邏輯比較簡單
        組裝註冊中心協議的url
        判斷是否配置legroup,如果有,則cluster=getMergeableCluster(),構建invoker
        doRefer構建invoker

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
 //這段程式碼也很熟悉,就是根據配置的協議,生成註冊中心的url: zookeeper:// 
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);
}

doRefe​​​​​​​r
doRefer裡面就稍微複雜一些,涉及到比較多的東西,我們先關注主線
        構建一個RegistryDirectory
        構建一個consumer://協議的地址註冊到註冊中心
        訂閱zookeeper中節點的變化
        呼叫cluster.join方法

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  //RegistryDirectory初始化 
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
     //註冊consumer://協議的url
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    //訂閱事件監聽
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    //構建invoker
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

cluster.join
 Invoker invoker = cluster.join(directory); //返回的是一個MockClusterWrapper(FailOverCluster(directory))

接著回到ReferenceConfig.createProxy方法中的最後一行
proxyFactory.getProxy 
而這裡的proxyFactory又是一個自適應擴充套件點,所以會進入下面的方法 
JavassistProxyFactory.getProx​​​​​​​y

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
通過dubug除錯一下得到ccp這個變數——>mMetthods如下程式碼。
public java.lang.String sayHello(java.lang.String arg0){  //sayHello是自定義的暴露服務
     Object[] args = new Object[1];   
     args[0] = ($w)$1;   
     Object ret = handler.invoke(this, methods[0], args); 
     return (java.lang.String)ret; 
}

3. 實現遠端網路通訊

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    //它是實現服務目標服務訂閱的
   
            
           

相關推薦

dubbo花錢知識精髓

Dubbo 初步認識dubbo及基本應用       &nbs

花錢來的知識,不能快速變現!知識付費巨頭只好換“姿勢”

知識付費2018年的知識付費再次迎來姿勢變陣,打頭換體位的還是分答。或許,準確來說,上線632天之後,從2月6日開始,分答已經成為一個歷史名詞,它的新名字叫做“在行一點”。按照創始人姬十三在這個更名發布會、亦是分答歷史上第二場發布會上的說法,這一次品牌升級並非簡單的換皮重生,而是通過整整一年多的試驗,開發出了

售價3899元!想不花錢華為Mate10?

理財 投資 生活 金融 互聯網先看一下今天公布的華為Mate10價格 華為Mate10(6GB+128GB): 4499元 華為Mate10(4GB+64GB):3899元 不花錢就能買華為Mate10,真的,真的嗎? 難道是“滿XX送一臺手機”?“購買金額最高者送手機”?“拉好友抽獎贏手機”? 別逗了,小

requests 使用免費的代理ip爬取網站(要效率的話還是推薦花錢vip)

import requests import queue import threading from lxml import etree #要爬取的URL url = "http://xxxxx" #代理ip網站 proxy_url = "https://www.kuaidaili.com/free

花錢的課程中,重點標記...

inf .com http mage img src 重點 分享 com 花錢買的課程中,重點標記...

Dubbo(一) —— 基礎知識和專案搭建

<!-- 1、指定當前服務/應用的名字(同樣的服務名字相同,不要和別的服務同名) --> <dubbo:application name="boot-user-service-provider"></dubbo:application>

dubbo架構相關知識學習

dubbo架構分為十層: Service:介面層,提供服務端以及客戶端實現,類ServiceBean和ReferenceBean     Config:配置層,ServiceConfig和ReferenceConfig,從dubbo.xsd中屬性依賴如下,我們可

趣頭條“花錢使用者”?你看懂趣頭條真正玩法了嗎

最近,被號稱“五環外的今日頭條”、“資訊界的拼多多”的趣頭條終於釋出了它的第一份成績單。在亮眼的業績的背後,也有著一定的虧損,雖然說虧損在網際網路企業當中不算什麼,但是很多人質疑說趣頭條指令碼是不是在“買使用者”?那麼在趣頭條快速發展的背後其玩法的核心邏輯到底在哪? 一、業績亮眼卻被質疑的

51Nod-1621-花錢車牌

ACM模版 描述 題解 水題不水,有坑。 思路炒雞簡單,首先我們記錄下來 0∼9 每個數字的個數,然後列舉讓 x 出現 k 次的最小花費。 這裡先說第一個容易錯的點兒,那就是當初始狀態就滿族時,直接特判最小花費為 0,按照原數輸出; 第二個容

51nod1621-花錢車牌

一個車牌號由n位數字組成。如果一個車牌至少有k位數字是相同的,那麼我們就說這個車牌漂亮的車牌。現在華沙想要改變他自己的車牌,使得他的車牌變得漂亮。當然,改車牌是要花錢的。每改變一位數字所要花費的費用等於當前位上的新舊數字之差的絕對值。那麼總費用就是每位上所花費用的總和。 舉例如下, 舊牌為0123,新

不想花錢書,我教你免費拿

關注我的朋友都知道,我每篇文章都會送電子書,所以,最近後臺聯絡我要書的人也比較多,因此我必須寫一篇文章關於如何免費閱讀好書。 一來是覺得授人以魚不如授人以漁,讓我的讀者都知道要去哪些渠道去獲取你想看的書。 二來也想減輕我的負擔,粉絲太多,我有些忙不過來,不能

Dubbo服務合平臺搭建出售釋出之服務暴露&心跳機制&服務註冊

Dubbo服務釋出 Dubbo合買平臺搭建出售 dsluntan.com Q:3393756370 VX:17061863513服務釋出影響流程的主要包括三個部分,依次是: 服務暴露 心跳 服務註冊 服務暴露是對外提供服務及暴露埠,以便消費端可以正常調通服務。心跳機制保證伺服器端及客戶

開車人千金難知識 組圖

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

多執行緒(模擬票)-----java基礎知識總結

這次的的問題引入的比較深入,如果看了這篇部落格,不看下一篇,你會很懵逼。 程式碼: 1 package com.day13.math; 2 /** 3 * 類說明 :模擬三個視窗同時售票 4 * @author 作者 : chenyanlong 5 * @versi

盤點程式設計師們的“奢侈品”,最後一樣花錢不到!

今天咱們就來盤點一下程式設計師用的“奢侈品”,這裡面有些是比較貴的,當然對於程式設計師來說基本都是買的起的,只不過是你會不會去花這個錢而已,最後一樣,個人認為花錢也買不到!談談你們的想法吧。話不多說,咱們直奔主題! 第一個: 人體力學辦公桌辦公椅,程式設計師們一天在電腦面前要進行10小時甚至

SpringBoot雜碎知識 (十三) Dubbo的整合

Dubbo是一款高效能、輕量級的開源Java RPC框架,它提供了三大核心能力:面向介面的遠端方法呼叫,智慧容錯和負載均衡,以及服務自動註冊和發現。,它最大的特點是按照分層的方式來架構,使用這種方式可以使各個層之間解耦合(或者最大限度地鬆耦合)。 在2月15日

Dubbo基本知識與簡單demo

Dubbo背景和簡介 Dubbo開始於電商系統,因此在這裡先從電商系統的演變講起。 1,單一應用框架(ORM) 當網站流量很小時,只需一個應用,將所有功能如下單支付等都部署在一起,以減少部署節點和成本。 缺點:單一的系統架構,使得在開發過程中,佔用的資源越來越多,而且隨著流量的增加越來越難以維護  2.

Dubbo相關知識及面試題

 面試題:Dubbo中zookeeper做註冊中心,如果註冊中心叢集都掛掉,釋出者和訂閱者之間還能通訊麼? 可以的,啟動dubbo時,消費者會從zk拉取註冊的生產者的地址介面等資料,快取在本地。每次呼叫時,按照本地儲存的地址進行呼叫 註冊中心對等叢集,任意一臺宕掉後,會自動切

Dubbo相關知識

1、Dubbo是什麼? 阿里巴巴開源的Java高效能分散式開發框架,特點是按照分層的方式來架構 2、為什麼要用Dubbo? 內部使用了Netty、Zookeeper,保證了高效能高可用性。 3、Dubbo和Spring Cloud 有什麼區別? 沒關聯 1、通訊方式不同 Dubbo使用RP

車需要了解的知識(持續更新)

     現在結婚了,完成了人生第一件大事,接下來就要完成人生的第二件大事,就是買車。本人的定位是B級車,因為B級車比較適合家用。 但買車前,作為一個小白,還是有很多知識不懂的,希望各位網友有看到這篇博文就多多推薦多多指教。這篇博文就當作收集網上的一些碎片