Dubbo實踐(十三)Refer
Spring在啟動Dubbo客戶端應用時,會實例化ReferenceBean<T>並設置配置屬性,然後調用ReferenceConfig中的get方法:
public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; }
private void init() { if (initialized) { return; } initialized = true; // 省略... ref = createProxy(map); // 這裏使用了動態代理生成對象 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); // ref保存到consumerModel ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); // consumerModel保存到concurrentHashMap,內存中 }
ref = createProxy(map); 這裏使用了動態代理生成了代理對象(這裏也可以成為遠程代理,因為在這個代理中進行了遠程調用),ref 即getBean返回的對象,這樣在本地調用業務Service接口時就會使用代理處理器com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler。
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { URL tmpUrl= new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if (isInjvm() == null) { if (url != null && url.length() > 0) { // if a url is specified, don‘t do local reference // 指定URL的情況下,不進行本地引用 isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { // by default, reference local service if there is // 默認情況下如果本地有服務暴露,則引用本地服務 isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 用戶指定URL,指定的URL可能是點對點直連地址,也可能是註冊中心URL if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center‘s address. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center‘s configuration // 通過註冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { // 用了最後一個registry url registryURL = url; // use last registry url } } // 有 註冊中心協議的URL if (registryURL != null) { // registry url is available // use AvailableCluster only when register‘s cluster is available // 對有註冊中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url // 不是 註冊中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && !invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // create service proxy // 創建服務代理 return (T) proxyFactory.getProxy(invoker); }
生成Invoker
這裏調用 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 來生成invoker,代碼如下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
目前客戶端invoke後,調用DubboInvoker的doInvoke函數,doInvoke函數中,通過currentClient.send或者currentClient.request發送數據。
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { // 直接發送請求,不要求響應 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // 異步方式發送請求,將響應存儲在future中 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 同步方式發送請求,並等待響應 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
調用NettyClient的send方法:
public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); }
這裏實際上是利用netty的網絡通信機制進行通信,而netty的機制保證數據接收處理:
@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); }
此時若收到數據,則會調用NettyHandler的messageReceived函數:
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
此時,會發現,最後是調用DubboProtocol中requestHandler的received函數。
Handler的調用鏈是職責鏈模式和裝飾器模式的混合模式(類似Tomcat的Filter),外層的Handler調用內層的Handler,並在調用前後加上一些邏輯; 最後,在received函數中:
@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } }
這裏會發現super.received(channel, message)是空函數,因此若是客戶端處理到這裏,就不做任何處理了; 如果是服務器端,調用的是reply函數:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it‘s a callback // 如果是callback 需要處理高版本調用低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
此時getInvoker實際上是從exporter中去獲取Invoker。這裏通過if (message instanceof Invocation)可知,如果不是調用,則此時不做其他處理,也就是客戶端調用在這裏是不做任何處理的。 因此我們返回到客戶端的調用中,會發現HeaderExchangeClient的如下函數處理:
public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); }
此時,轉到HeaderExchangeChannel:
public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
實際上這裏返回的是DefaultFuture。而從doInvoke函數中, return (Result) currentClient.request(inv, timeout).get(); 返回的是一個Result的子對象。而我們返回到客戶端的調用InvokerInvocationHandler:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
此時會發現最後結果是從Result的recreate()函數返回而來,實際上這是一個RpcResult。此時我們可以猜測服務器端返回給客戶端的是一個Response對象,同時其中的mResult(private Object mResult)是一個實現了Result接口的對象,我搜索了一下代碼,目前只有RpcResult對象實現了Result接口,因此可以肯定的是服務器端返回給客戶端的是一個RpcResult對象。
Invoker屏蔽了通信相關的細節,此時需要註意的是默認使用的網絡實現是Netty。
Dubbo實踐(十三)Refer