1. 程式人生 > >8.1 構建客戶端總體流程

8.1 構建客戶端總體流程

lib encoding tty ann dem 關系 zkclient 緩存 tom

一 示例

1 配置文件:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 4        xmlns="http://www.springframework.org/schema/beans"
 5        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
6 http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 7 <!-- 消費方應用名,用於計算依賴關系,不是匹配條件,不要與提供方一樣 --> 8 <dubbo:application name="demo-consumer"/> 9 <!-- 使用zookeeper註冊中心 --> 10 <dubbo:registry protocol="zookeeper" address="10.211.55.5:2181"
/> 11 <!-- 生成遠程服務代理,可以和本地bean一樣使用demoService --> 12 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/> 13 </beans>

2 Consumer

 1 package com.alibaba.dubbo.demo.consumer;
 2 
 3 import com.alibaba.dubbo.demo.DemoService;
 4 import
org.springframework.context.support.ClassPathXmlApplicationContext; 5 6 public class Consumer { 7 public static void main(String[] args) { 8 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 9 context.start(); 10 11 DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理 12 String hello = demoService.sayHello("world"); // 執行遠程方法 13 14 System.out.println(hello); // 顯示調用結果 15 } 16 }

先來看DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理。

二 調用簡圖

技術分享

三 總體代碼調用鏈

ReferenceConfig.init()
-->createProxy(Map<String, String> map)
  //一 獲取Invoker
  -->RegistryProtocol.refer(Class<T> type, URL url)
    //1 獲取註冊中心:創建ZkClient實例,連接zk
    -->Registry registry = registryFactory.getRegistry(url)
      -->AbstractRegistryFactory.getRegistry(URL url)
        -->ZookeeperRegistryFactory.createRegistry(URL url)
          -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
            -->ZkclientZookeeperTransporter.connect(URL url)
              -->new ZkclientZookeeperClient(URL url)
                -->new ZkClient(url.getBackupAddress())
            -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)
    -->doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
      -->new RegistryDirectory<T>(type, url)
      //2 向註冊中心註冊服務
      -->registry.register(url)
        -->ZookeeperRegistry.doRegister(URL url)
          -->AbstractZookeeperClient.create(String path, boolean ephemeral)
      -->RegistryDirectory.subscribe(URL url)
        -->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
          //3 會獲取當前節點下已經存在的字節點(第一次服務發現發生在這裏),添加子節點變化監聽器
          -->List<String> children = zkClient.addChildListener(path, zkListener)
          -->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
            -->saveProperties(url)
            -->RegistryDirectory.notify(List<URL> urls)
              //僅僅針對的是providers
              -->refreshInvoker(List<URL> invokerUrls)
                -->toInvokers(List<URL> urls
                  -->ProtocolFilterWrapper.refer(Class<T> type, URL url)
                    -->DubboProtocol.refer(Class<T> serviceType, URL url)
                      //4 創建ExchangeClient,對第一次服務發現providers路徑下的相關url建立長連接
                      -->getClients(URL url)
                        -->getSharedClient(URL url)
                          -->ExchangeClient exchangeClient = initClient(url)
                            -->Exchangers.connect(url, requestHandler)
                              -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
                                -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                                  -->Transporters.connect(URL url, ChannelHandler... handlers)
                                    -->NettyTransporter.connect(URL url, ChannelHandler listener)
                                      -->new NettyClient(url, listener)
                                        -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                                        -->getChannelCodec(url)//獲取Codec2,這裏是DubboCountCodec實例
                                        -->doOpen()//開啟netty客戶端
                                        -->doConnect()//連接服務端,建立長連接
                                -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient實例,needHeartbeat:true
                                  -->startHeatbeatTimer()//啟動心跳計數器
                          -->ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)/
                          -->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("10.242.48.210:20880", 上邊的ReferenceCountExchangeClient實例)
                      //5 創建DubboInvoker
                      -->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers)
                      -->DubboProtocol.Set<Invoker<?>> invokers.add(上邊的DubboInvoker實例)
                    -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
                  -->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl)
                  //6 將創建出來的Invoker緩存起來
                  -->newUrlInvokerMap.put("dubbo://10.242.48.210:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16001&register.ip=10.242.48.210&remote.timestamp=1510127991625&side=consumer&timestamp=1510128022123", 上邊的InvokerDelegate實例)
                -->toMethodInvokers(newUrlInvokerMap)
                -->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete實例], *=[InvokerDelegete實例]}
      //7 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
      -->cluster.join(directory)
        -->Cluster$Adaptive.join(directory)
          -->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")//MockClusterWrapper包裝FailoverCluster
            -->MockClusterWrapper.join(Directory<T> directory)
              -->FailoverCluster.join(Directory<T> directory)
                -->new FailoverClusterInvoker<T>(directory)
              -->MockClusterInvoker(Directory<T> directory, Invoker<T> invoker)//invoker:上邊的FailoverClusterInvoker實例
  //二 獲取代理
  -->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)//invoker:上邊的MockClusterInvoker實例, interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
    -->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
      -->Proxy.getProxy(ClassLoader cl, Class<?>... ics)//使用javassist獲取一個動態類
      -->new InvokerInvocationHandler(invoker)//invoker:上邊的MockClusterInvoker實例

8.1 構建客戶端總體流程