dubbo原始碼淺析(五)-遠端服務呼叫流程
消費端呼叫遠端服務介面時,使用上和呼叫普通的java介面是沒有任何區別,但是服務消費者和提供者是跨JVM和主機的,客戶端如何封裝請求讓服務端理解請求並且解析服務端返回的介面呼叫結果,服務端如何解析客戶端的請求並且向客戶端返回呼叫結果,這些框架是如何實現的,下面就來看下這部分的程式碼。
消費端呼叫提供端服務的過程要執行下面幾個步驟:
1. 消費端觸發請求
2. 消費端請求編碼
3. 提供端請求解碼
4. 提供端處理請求
5. 提供端響應結果編碼
6. 消費端響應結果解碼
消費端觸發請求
在消費者初始化的時候,會生成一個消費者代理註冊到容器中,該代理回撥中持有一個MockClusterInvoker例項,消費呼叫服務介面時它的invoke會被呼叫,此時會構建一個RpcInvocation物件,把服務介面的method物件和引數放到RpcInvocation物件中,作為MockClusterInvoker.invoke方法的引數,在這個invoke方法中,判斷請求是否需要mock,是否配置了mock屬性,是強制mock還是失敗後mock,關於mock這裡先不詳細展開,這裡只看下核心流程。
MockClusterInvoker.invoke會呼叫FailfastClusterInvoker.invoke,大系統中為了服務高可用同一個服務一般會有多個應用伺服器提供,要先挑選一個提供者提供服務。在服務介面消費者初始化時,介面方法和提供者Invoker對應關係儲存在RegistryDirectory的methodInvokerMap中,通過呼叫的方法名稱(或方法名稱+第一個引數)改方法對應的提供者invoker列表,如註冊中心設定了路由規則,對這些invoker根據路由規則進行過濾。
com.alibaba.dubbo.registry.integration.RegistryDirectory.doList(Invocation)
com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory.list(Invocation)
讀取到所有符合條件的服務提供者invoker之後,由LoadBalance元件執行負載均衡,從中挑選一個invoker進行呼叫,框架內建支援的負載均衡演算法包括random(隨機)、roundrobin(R-R迴圈)、leastactive(最不活躍)、consistenthash(一致性hash),應用可配置,預設random。
methodInvokerMap儲存的是持有DubboInvoker(dubbo協議)例項的InvokerDelegete物件,是Invoker-Filter鏈的頭部,先啟用Filter連然後最終調到DubboInvoker.invoke(RpcInvocation),此時遠端呼叫分三種類型:
1. 單向呼叫,無需獲取關注呼叫結果的,無需等待介面返回結果,注意呼叫結果不要單純跟返回值混淆了,異常也是呼叫結果。
2. 非同步呼叫,需要關注返回結果,但是不會同步等待介面呼叫結束,會非同步的獲取返回返回結果,這種情況給呼叫者返回一個Future,但是不同步等待Future.get返回呼叫結果
3. 同步呼叫,需要同步等待服務呼叫結束獲取呼叫結果,給呼叫者返回一個Future並且Future.get等待結果,此時介面呼叫執行緒會掛起等待響應。
我們大部分使用場景都是同步呼叫,所以主要看一下同步呼叫。如果使用者配置了多個connections按順序選擇一個ExchangeClient和伺服器通訊,同步呼叫時呼叫HeaderExchangeClient.request->HeaderExchangeChannel.request。
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(Object, int)
這裡的request引數是RpcInvocation物件,包含呼叫的方法、引數等資訊,timeout引數是介面超時時間,把這些資訊封裝在Request物件中,呼叫channel.send,這個channel物件就是和服務端打交道的NettyClient例項,NettyClient.send呼叫NettyChannel.send。
com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(Object, boolean)
這裡的sent引數決定是否等待請求訊息發出,sent=true 等待訊息發出,訊息傳送失敗將丟擲異常,sent=false 不等待訊息發出,將訊息放入IO佇列,即刻返回。預設情況下都是false。NettyChannel中有channel屬性,這個channel是Netty框架中的元件,負責客戶端和服務端鏈路上的訊息傳遞,channel.write把請求訊息寫入,這裡的message是上面封裝的Request物件。這裡的IO模型是非阻塞的,執行緒不用同步等待所有訊息寫完,而是直接返回。呼叫Netty框架的IO事件之後會觸發Netty框架的IO事件處理鏈。
消費端請求編碼
在消費者初始化建立NettyClient時瞭解到了,NettyClient添加了三個事件處理器組成處理器鏈:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler,其中NettyCodecAdapter.encoder下行事件處理器(實現了ChannelDownstreamHandler介面),NettyCodecAdapter. decoder是上行事件處理器(實現了ChannelUpstreamHandler介面),NettyHandler是上行事件+下行時間處理器(同時實現了ChannelUpstreamHandler和ChannelDownstreamHandler介面)。channel.write在Netty框架中是一個下行事件,所以NettyCodecAdapter.encoder和NettyHandler處理器會被回撥,下行事件的事件處理器呼叫順序是從後到前,即後新增的處理器先執行。
NettyHandler沒有對請求訊息做任何加工,只是觸發dubbo框架的一些回撥,這些回撥裡面沒有做任何核心的事情,
com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent)
encoder顧名思義就是編碼器,它的主要工作就是把資料按照客戶端-服務端的約定協議對請求資訊和返回結果進行編碼。看下它的encode方法:
下行事件觸發之後依次呼叫handleDownstream->doEncode->encode,在encode中對Request物件進行編碼。這個msg引數就是上面被write的Request物件,這裡的Codec2元件是DubboCountCodec實現,DubboCountCodec.encode呼叫DubboCodec.Encode
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encode(Channel, ChannelBuffer, Object)
根據協議,訊息中寫入16個位元組的訊息頭:
1、1-2位元組,固定的魔數
2、第3個位元組,第8位儲存資料型別是請求資料還是響應資料,其它7位儲存序列化型別,約定和服務端的序列化-反序列化協議
3、5-12個位元組,請求id
4、13-16個位元組,請求資料長度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(Channel, ChannelBuffer, Request)
從URL中查詢序列化擴充套件點名稱,載入序列化元件把請求物件序列化成二進位制。消費端和提供端的序列化反序列化協議要配套,所以這個序列化協議一般是在提供端指定的,指定的協議型別會在提供者和消費者初始化的時候寫入到URL物件中,框架中預設的序列化協議是hessian2。訊息體資料包含dubbo版本號、介面名稱、介面版本、方法名稱、引數型別列表、引數、附加資訊,把它們按順序依次序列化,資料寫入到型別為ChannelBuffer的buffer引數中,然後把ChannelBuffer封裝成Netty框架的org.jboss.netty.buffer.ChannelBuffer。如果引數中有回撥介面,還需要在消費端啟動埠監聽提供端的回撥,這裡不展開。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(Channel, ObjectOutput, Object)
然後把封裝好的ChannelBuffer寫到鏈路傳送到服務端,這裡消費端前半部分的工作就完成,接下來目光要轉移到服務端。
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(ChannelHandlerContext, MessageEvent)
提供端請求解碼
在看提供端初始化程式碼的時候看到,框架在建立NettyServer時,也會建立netty框架的IO事件處理器鏈:NettyCodecAdapter.decoder->NettyCodecAdapter.encoder->NettyHandler
com.alibaba.dubbo.remoting.transport.netty.NettyServer.doOpen()
客戶端傳送資料到服務端時會觸發服務端的上行IO事件並且啟動處理器回撥,NettyCodecAdapter.decoder和NettyHandler是上行事件處理器,上行事件處理器呼叫順序是從前到後執行,即先新增的處理器先執行,所以先觸發NettyCodecAdapter.decoder再觸發NettyHandler。
由NettyCodecAdapter.decoder對請求進行解碼,把訊息翻譯成提供端可理解的,上行事件呼叫decoder的handleUpstream->messageReceived
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(Channel, ChannelBuffer, int, byte[])
把資料讀取到ChannelBuffer之後扔給Codec2元件進行解碼處理,這裡有個半包傳輸處理,因為這裡使用的是非阻塞式的IO模型,非阻塞IO的特點是執行緒的讀取資料是事件觸發式,是由一個Selector元件輪詢準備就緒的IO事件,發現準備就緒的事件之後通知執行緒讀取,這種模式的好處是可以極大的優化執行緒模型,只需少數幾個執行緒處理所有客戶端和服務端連線,而阻塞IO需要執行緒和連線要一對一,但是非阻塞IO遠高於阻塞式IO,不像阻塞式IO讀寫資料時只有資料讀完或者超時才會返回,這樣能保證讀到的資料肯定是完整,而非阻塞模式方法返回之後可能只讀到一部分資料,框架的處理是在解析訊息時檢查訊息的長度確定是否有完整的資料,如果資料不完整返回NEED_MORE_INPUT,儲存當前解析的位置等待鏈路的下次IO事件,在下次IO事件到達時從上次儲存的位置開始解析。
讀取到完整的資料之後解析資料頭,讀取魔數、序列化型別、以及請求id,讀取第3個位元組判斷改資料是消費端請求資料還是提供端響應資料(因為消費端和提供端解碼器程式碼是共用的),並且從1-7位從讀出序列化型別,並且根據此序列化型別載入序列化元件對訊息進行反序列化按順序讀取消費端寫入的dubbo版本號、介面名稱、介面版本、方法名稱、引數型別列表、引數、附加資訊,寫入DecodeableRpcInvocation物件對應的屬性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation.decode(Channel, InputStream)
建立一個Request物件,把DecodeableRpcInvocation物件物件設定到Request物件的data屬性中。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(Channel, InputStream, byte[])
解碼完成之後,啟用下一個處理器的messageReceived事件,並且把解碼後的物件封裝在MessageEvent中。
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived(ChannelHandlerContext, MessageEvent)
org.jboss.netty.channel.Channels.fireMessageReceived(ChannelHandlerContext, Object, SocketAddress)
Decoder執行完之後,事件進入到下一個處理器NettyHandler,看下NettyHandler中的程式碼:
這裡直接交給handler處理了,這個handler封裝了很多層:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler,中間封裝了好幾萬層這裡只把重要的列出來,源頭是從建立NettyServer的時候傳過來的。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer(URL)
先會走到DecodeHandler.received:
com.alibaba.dubbo.remoting.transport.DecodeHandler.received(Channel, Object)
這個message是Request型別的,要先decode一下,因為在之前已經解碼過了,所以這裡不會做任何事情,直接走下一個handler.received,這個handler就是HeaderExchangeHandler:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(Channel, Object)
普通的同步介面twoWay屬性是true走handleRequest方法處理請求,處理結束之後呼叫channel.send把結果返回到客戶端。
提供端處理請求
請求處理再走下一個handler的reply,這個handler就是DubboProtocol.requestHandler,把request物件中的data取出來傳到requestHandler中,這個data就是前面的解碼後的DecodeableRpcInvocation物件它是Invocation介面的一個實現:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(ExchangeChannel, Request)
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
查詢提供端請求對應的Invoker,在介面提供者初始化時,每個介面都會建立一個Invoker和Exporter,Exporter持有invoker例項,Exporter物件儲存在DubboProtocol的exporterMap中,key是由URL生成的serviceKey,此時通過Invocation中的資訊就可還原該serviceKey並且找到對應的Exporter和Invoker,在分析提供者初始化程式碼時知道它是Invoker-Filter的頭節點,啟用Filter後呼叫由ProxyFactory生成的Invoker:
呼叫invoker.invoke時,通過反射呼叫最終的服務實現執行相關邏輯。
服務執行結束之後,建立一個Response物件返回給客戶端。在執行服務實現時會出現兩種結果:成功和失敗,如果成功,把返回值設定到Response的result中,Response的status設定成OK,如果失敗,把失敗異常設定到Response的errorMessage中,status設定成SERVICE_ERROR。
回到HeaderExchangeHandler.received中的程式碼,在handleRequest之後,呼叫channel.send把Response傳送到客戶端,這個channel封裝客戶端-服務端通訊鏈路,最終會呼叫Netty框架,把響應寫回到客戶端。
提供端響應結果編碼
提供端要按照和消費端的協議把Response按照特定的協議進行編碼,把編碼後的資料寫回到消費端,從上面的程式碼可以看到,在NettyServer初始化的時候,定義了三個IO事件處理器,服務端往客戶端回寫響應時產生下行事件,處理下行事件處理器,NettyCodecAdapter.encoder和NettyHandler是下行事件處理器,先啟用NettyHandler,再啟用NettyCodecAdapter. encoder,在NettyCodecAdapter. encoder對響應結果進行編碼,還是通過Code2元件和請求編碼時使用的元件一樣,把響應型別和響應結果依次寫回到客戶端,根據協議會寫入16個位元組的資料頭,包括:
1、1-2位元組魔數
2、第3個位元組,序列化元件型別,約定和客戶端的序列化-反序列化協議
3、第4個位元組,響應狀態,是OK還是error
4、5-13個位元組,響應id,這裡的id和request中的id一樣
5、13-16個位元組,響應資料長度
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.encodeResponse(Channel, ChannelBuffer, Response)
返回結果有三種結果:1、沒有返回值即返回型別是void;2、有返回值並且執行成功;3、服務呼叫異常。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.encodeResponseData(Channel, ObjectOutput, Object)
解碼後的資料會寫入到通訊鏈路中。
消費端響應結果解碼
服務端給客戶端回寫資料之後,客戶端會收到IO事件,一個上行事件。NettyClient中有兩個上行事件處理器NettyCodecAdapter.decoder和NettyHandler,按照順序decoder先執行對服務端傳過來的資料進行解碼,解析出序列化協議、響應狀態、響應id(即請求id)。把響應body資料讀到DecodeableRpcResult物件中,進行解析同時載入處理原始Request資料,這個Request物件在請求時會被快取到DefaultFuture中,載入Request的目的是因為Request中Invocation中攜帶了服務介面的返回值型別資訊,需要根據這個型別把響應解析建立對應型別的物件。
com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(Channel, InputStream)
建立Response物件並且把解析出結果或異常設定到Response中。
decoder把響應解析成Response物件中,NettyHandler接著往下處理,同樣觸發它的messageReceive事件,在提供端解碼的時候看到了,它的handler封裝關係是:DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler,主要處理在HeaderExchangeHandler中:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleResponse(Channel, Response)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.doReceived(Response)
這裡主要做的事情是喚醒呼叫者執行緒,並且把Response設定到DefaultFuture中,在消費者觸發請求的程式碼中可以看到,消費端呼叫介面的時候請求寫到提供端之後,會呼叫DefaultFuture.get阻塞等待響應結果:
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.get(int)
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture.isDone()
在done這個Condition上進行條件等待,DefaultFuture.doReceive時,設定response喚醒done,此時呼叫執行緒被喚醒並且檢查是否已經有了response(避免假喚醒),喚醒之後返回response中的result,呼叫端即拿到了介面的呼叫結果(返回值或異常),整個遠端服務介面的呼叫流程就完成了。
超時處理
前面說了在進行介面呼叫時會出現兩種情況:介面呼叫成功、介面呼叫異常,其實還有一種情況就是介面呼叫超時。在消費端等待介面返回時,有個timeout引數,這個時間是使用者設定的,可在消費端設定也可以在提供端設定,done.await等待時,會出現兩種情況跳出while迴圈,一是執行緒被喚醒並且已經有了response,二是等待時間已經超過timeout,此時也會跳出while,當跳出while迴圈並且Future中沒有response時,就說明介面已超時丟擲TimeoutException,框架把TimeoutException封裝成RpcException拋給應用層。