dubbo非同步呼叫原理 (1)
此文已由作者趙計剛授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
一、使用方式
服務提供方不變,呼叫方程式碼如下:
1 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"> 2 <dubbo:method name="sayHello" async="true" timeout="60000"/> 3 <dubbo:method name="sayBye" async="true" timeout="60000"/> 4 </dubbo:reference>
配置裡新增<dubbo:method name="xxx" async="true"/>,表示單個方法xxx使用非同步方式;如果demoService下的所有方法都使用非同步,直接配置為<dubbo:reference async="true"/>。
1 public static void main(String[] args) throws Exception { 2 //Prevent to get IPV6 address,this way only work in debug mode 3 //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not 4 System.setProperty("java.net.preferIPv4Stack", "true"); 5 6 asyncFuture2(); 7 } 8 9 public static void asyncFuture1() throws ExecutionException, InterruptedException { 10 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 11 context.start(); 12 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy 13 14 long start = System.currentTimeMillis(); 15 16 demoService.sayHello("zhangsan"); 17 Future<String> helloFuture = RpcContext.getContext().getFuture(); 18 19 demoService.sayBye("lisi"); 20 Future<String> byeFuture = RpcContext.getContext().getFuture(); 21 22 final String helloStr = helloFuture.get();//消耗5s 23 final String byeStr = byeFuture.get();//消耗8s 24 25 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//總消耗8s 26 } 27 28 public static void asyncFuture2() throws ExecutionException, InterruptedException { 29 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 30 context.start(); 31 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy 32 33 long start = System.currentTimeMillis(); 34 35 Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan")); 36 Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi")); 37 38 final String helloStr = helloFuture.get();//消耗5s 39 final String byeStr = byeFuture.get();//消耗8s 40 41 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//總消耗8s 42 }
Consumer啟動主類。其中asyncFuture2()方法是推薦用法,注意Callable(asyncCall方法的入參)只是一個任務task,不會新建執行緒;所以asyncFuture2()和asyncFuture1()相似,資源佔用相同,都是用一根執行緒進行非同步操作的。
二、asyncFuture1()原始碼解析
先來看asyncFuture1(),總體步驟:
demoService.sayHello("zhangsan"); 建立一個Future物件,存入當前執行緒的上下文中
Future<String> helloFuture = RpcContext.getContext().getFuture(); 從當前執行緒的上下文中獲取第一步存入的Future物件
final String helloStr = helloFuture.get(); 阻塞等待,從Future中獲取結果
程式碼主要執行流(程式碼詳細執行流看文章開頭的三篇部落格):
1、demoService.sayHello("zhangsan");
-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation) -->DubboInvoker.doInvoke(final Invocation invocation)
FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { 2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); 3 4 fireInvokeCallback(invoker, invocation); 5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's 6 // necessary to return future. 7 Result result = invoker.invoke(invocation); 8 if (isAsync) { 9 asyncCallback(invoker, invocation); 10 } else { 11 syncCallback(invoker, invocation, result); 12 } 13 return result; 14 }
對於如上非同步操作(asyncFuture1()和asyncFuture2()),FutureFilter沒起任何作用,該Filter主要會用在事件通知中,後續再說。
DubboInvoker.doInvoke(final Invocation invocation):
1 protected Result doInvoke(final Invocation invocation) throws Throwable { 2 RpcInvocation inv = (RpcInvocation) invocation; 3 final String methodName = RpcUtils.getMethodName(invocation); 4 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); 5 inv.setAttachment(Constants.VERSION_KEY, version); 6 7 ExchangeClient currentClient; 8 if (clients.length == 1) { 9 currentClient = clients[0]; 10 } else { 11 currentClient = clients[index.getAndIncrement() % clients.length]; 12 } 13 try { 14 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); 15 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); 16 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 17 if (isOneway) { //無返回值 18 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); 19 currentClient.send(inv, isSent); 20 RpcContext.getContext().setFuture(null); 21 return new RpcResult(); 22 } else if (isAsync) { //非同步有返回值 23 ResponseFuture future = currentClient.request(inv, timeout); 24 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 25 return new RpcResult(); 26 } else { //同步有返回值 27 RpcContext.getContext().setFuture(null); 28 return (Result) currentClient.request(inv, timeout).get(); 29 } 30 } catch (TimeoutException e) { 31 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 32 } catch (RemotingException e) { 33 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 34 } 35 }
模式:
如果是isOneway(不需要返回值),不管同步還是非同步,請求直接發出,不會建立Future,直接返回RpcResult空物件。
如果是isAsync(非同步),則
先建立ResponseFuture物件,之後使用FutureAdapter包裝該ResponseFuture物件;(建立ResponseFuture物件與同步的程式碼相同,最後得到的是一個DefaultFuture物件)
然後將該FutureAdapter物件設入當前執行緒的上下文中RpcContext.getContext();
最後返回空的RpcResult
如果是同步,則先建立ResponseFuture物件,之後直接呼叫其get()方法進行阻塞呼叫(見文章開頭的三篇文章)
簡單來看一下FutureAdapter:
1 public class FutureAdapter<V> implements Future<V> { 2 3 private final ResponseFuture future; 4 5 public FutureAdapter(ResponseFuture future) { 6 this.future = future; 7 } 8 9 public ResponseFuture getFuture() { 10 return future; 11 } 12 13 public boolean cancel(boolean mayInterruptIfRunning) { 14 return false; 15 } 16 17 public boolean isCancelled() { 18 return false; 19 } 20 21 public boolean isDone() { 22 return future.isDone(); 23 } 24 25 @SuppressWarnings("unchecked") 26 public V get() throws InterruptedException, ExecutionException { 27 try { 28 return (V) (((Result) future.get()).recreate()); 29 } catch (RemotingException e) { 30 throw new ExecutionException(e.getMessage(), e); 31 } catch (Throwable e) { 32 throw new RpcException(e); 33 } 34 } 35 36 @SuppressWarnings("unchecked") 37 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 38 int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS); 39 try { 40 return (V) (((Result) future.get(timeoutInMillis)).recreate()); 41 } catch (com.alibaba.dubbo.remoting.TimeoutException e) { 42 throw new TimeoutException(StringUtils.toString(e)); 43 } catch (RemotingException e) { 44 throw new ExecutionException(e.getMessage(), e); 45 } catch (Throwable e) { 46 throw new RpcException(e); 47 } 48 } 49 }
最後,回頭看一下FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { 2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); 3 4 fireInvokeCallback(invoker, invocation); 5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's 6 // necessary to return future. 7 Result result = invoker.invoke(invocation); 8 if (isAsync) { 9 asyncCallback(invoker, invocation); 10 } else { 11 syncCallback(invoker, invocation, result); 12 } 13 return result; 14 }
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 使用QUIC
【推薦】 資料庫路由中介軟體MyCat - 原始碼篇(5)