RocketMQ原理解析-Remoting1. 通訊層實現
Rocketmq的通訊層是基於通訊框架netty 4.0.21.Final之上做了簡單的協議封裝,是強依賴。
一: NettyRemotingAbstract Server與Client公用抽象類
ResponseFuture模式:
invokeSyncImpl和invokeAsyncImpl都使用了
請求方會new一個ResponseFuture物件快取起來ConcurrentHashMap<Integer/* opaque */, ResponseFuture>,並且設定opaque 值
Broker接收請求將opaque 直接把這個值設定迴響應物件,客戶端接收到這個響應,通過opaque從快取查詢對應的ResponseFuture物件
構建ResponseFuture,設定opaque值
netty傳送請送並且設定監聽器回撥響應
傳送成功設定ResponseFuture傳送成功,退出監聽器
傳送失敗設定ResponseFuture傳送失敗,並且從快取中移除ResponseFuture(沒有響應過來,就用不到快取中的ResponseFuturel)
responseFuture.waitResponse(timeoutMillis)獲取響應
傳送成功,沒有響應物件說明超時
非同步一般鏈路耗時比較長, 為了防止本地快取的netty請求過多, 使用訊號量控制上限預設2048個
獲取是否可以處理請求
構建一次釋放物件
構建responseFuture物件,設定opaque, callback, once,超時時間等值,並放入快取集合
通過netty傳送請求,設定listener,
傳送成功responseFuture.setSendRequestOK(true);
傳送失敗responseFuture.setSendRequestOK(false), 訊號量通過once釋放, 刪除快取
Netty接收server端響應,根據opaque從快取獲取responseFuture,呼叫回撥方法即介面InvokeCallback實現
標記onewayRpc
用訊號量控制併發的數 //這是我對在這裡用新號量控制的理解
由定時任務啟動,定時檢視超時的快取請求,有callback的執行callback,讓後從快取中移除再釋放請求
根據請求code查詢對應的處理器執行緒池pair, 沒有用預設的
有處理器處理請求返回RemotingCommand物件的響應response
若不是onewayRpc 給response設定opaque
標記響應型別
通過netty寫入響應
當client向server傳送請求的時候,server處理後向client反饋處理結果。
根據RemotingCommand的opaque,從快取中取出對應的ResponseFuture
ResponseFuture設定響應物件RemotingCommand
responseFuture釋放訊號量
有callback的執行callback(通過執行緒池), 沒有的putResponse(這個方法同步呼叫使用,來countDownLatch, 因為呼叫執行緒在等待呢)
二:NettyRemotingServer Remoting 服務端實現
broker啟動初始化NettyRometingServer
向netty註冊handler
NettyEncoder協議編碼器,將RemotingCommand轉換為位元組,給netty傳輸
NettyDecoder協議解碼器, 將netty接收的輸入流,轉換成RemotingCommand
NettyConnetManageHandler 處理register,unregiter, active, inactive, exception
NettyServerHandler netty處理請求的業
向NettyRemotingServer註冊業務處理器
server接收client請求根據RequestCode選擇具體的處理器RequestProcessor,就是利用RequestCode進行策略的選擇
server(broker,namesrv)在啟動的時候會把RequestCode與對應的RequestProcessor和處理執行緒池註冊到NettyRemotingServer中去,程式碼類似如下:
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, SendMessageProcessor, sendMessageExecutor)
三:NetttyRemotingClient
向netty註冊handler
NettyEncoder協議編碼器,將RemotingCommand轉換為位元組,給netty傳輸
NettyDecoder協議解碼器, 將netty接收的輸入流,轉換成RemotingCommand
NettyConnetManageHandler 處理register,unregiter, active, inactive, exception
NettyClientHandler netty 處理請求的業
Client與通訊層的互動封了MQClientAPIImpl統一處理,在MQClientAPIImpl構造的時候註冊了ClientRemotingProcessor來處理server的請求