8.1 構建客戶端總體流程
阿新 • • 發佈:2017-11-12
lib encoding tty ann dem 關系 zkclient 緩存 tom
一 示例
1 配置文件:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 4 xmlns="http://www.springframework.org/schema/beans" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd6 http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 7 <!-- 消費方應用名,用於計算依賴關系,不是匹配條件,不要與提供方一樣 --> 8 <dubbo:application name="demo-consumer"/> 9 <!-- 使用zookeeper註冊中心 --> 10 <dubbo:registry protocol="zookeeper" address="10.211.55.5:2181"/> 11 <!-- 生成遠程服務代理,可以和本地bean一樣使用demoService --> 12 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/> 13 </beans>
2 Consumer
1 package com.alibaba.dubbo.demo.consumer; 2 3 import com.alibaba.dubbo.demo.DemoService; 4 importorg.springframework.context.support.ClassPathXmlApplicationContext; 5 6 public class Consumer { 7 public static void main(String[] args) { 8 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 9 context.start(); 10 11 DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理 12 String hello = demoService.sayHello("world"); // 執行遠程方法 13 14 System.out.println(hello); // 顯示調用結果 15 } 16 }
先來看DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理。
二 調用簡圖
三 總體代碼調用鏈
ReferenceConfig.init() -->createProxy(Map<String, String> map) //一 獲取Invoker -->RegistryProtocol.refer(Class<T> type, URL url) //1 獲取註冊中心:創建ZkClient實例,連接zk -->Registry registry = registryFactory.getRegistry(url) -->AbstractRegistryFactory.getRegistry(URL url) -->ZookeeperRegistryFactory.createRegistry(URL url) -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) -->ZkclientZookeeperTransporter.connect(URL url) -->new ZkclientZookeeperClient(URL url) -->new ZkClient(url.getBackupAddress()) -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例) -->doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) -->new RegistryDirectory<T>(type, url) //2 向註冊中心註冊服務 -->registry.register(url) -->ZookeeperRegistry.doRegister(URL url) -->AbstractZookeeperClient.create(String path, boolean ephemeral) -->RegistryDirectory.subscribe(URL url) -->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener) //3 會獲取當前節點下已經存在的字節點(第一次服務發現發生在這裏),添加子節點變化監聽器 -->List<String> children = zkClient.addChildListener(path, zkListener) -->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls) -->saveProperties(url) -->RegistryDirectory.notify(List<URL> urls) //僅僅針對的是providers -->refreshInvoker(List<URL> invokerUrls) -->toInvokers(List<URL> urls -->ProtocolFilterWrapper.refer(Class<T> type, URL url) -->DubboProtocol.refer(Class<T> serviceType, URL url) //4 創建ExchangeClient,對第一次服務發現providers路徑下的相關url建立長連接 -->getClients(URL url) -->getSharedClient(URL url) -->ExchangeClient exchangeClient = initClient(url) -->Exchangers.connect(url, requestHandler) -->HeaderExchanger.connect(URL url, ExchangeHandler handler) -->new DecodeHandler(new HeaderExchangeHandler(handler))) -->Transporters.connect(URL url, ChannelHandler... handlers) -->NettyTransporter.connect(URL url, ChannelHandler listener) -->new NettyClient(url, listener) -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler))) -->getChannelCodec(url)//獲取Codec2,這裏是DubboCountCodec實例 -->doOpen()//開啟netty客戶端 -->doConnect()//連接服務端,建立長連接 -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient實例,needHeartbeat:true -->startHeatbeatTimer()//啟動心跳計數器 -->ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)/ -->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("10.242.48.210:20880", 上邊的ReferenceCountExchangeClient實例) //5 創建DubboInvoker -->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) -->DubboProtocol.Set<Invoker<?>> invokers.add(上邊的DubboInvoker實例) -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group) -->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl) //6 將創建出來的Invoker緩存起來 -->newUrlInvokerMap.put("dubbo://10.242.48.210:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16001®ister.ip=10.242.48.210&remote.timestamp=1510127991625&side=consumer×tamp=1510128022123", 上邊的InvokerDelegate實例) -->toMethodInvokers(newUrlInvokerMap) -->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete實例], *=[InvokerDelegete實例]} //7 將directory封裝成一個ClusterInvoker(MockClusterInvoker) -->cluster.join(directory) -->Cluster$Adaptive.join(directory) -->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")//MockClusterWrapper包裝FailoverCluster -->MockClusterWrapper.join(Directory<T> directory) -->FailoverCluster.join(Directory<T> directory) -->new FailoverClusterInvoker<T>(directory) -->MockClusterInvoker(Directory<T> directory, Invoker<T> invoker)//invoker:上邊的FailoverClusterInvoker實例 //二 獲取代理 -->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)//invoker:上邊的MockClusterInvoker實例, interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService] -->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)) -->Proxy.getProxy(ClassLoader cl, Class<?>... ics)//使用javassist獲取一個動態類 -->new InvokerInvocationHandler(invoker)//invoker:上邊的MockClusterInvoker實例
8.1 構建客戶端總體流程