RocketMQ原始碼學習---網路通訊篇
序言
本篇文章主要討論RocketMQ中網路通訊的具體實現。對於一個訊息中介軟體來說,它的作用一般會有
- 解耦
- 限流
- 訊息儲存
- 訊息轉發或者廣播
這幾個功能。這幾個功能都離不開網路通訊,下面先從巨集觀角度講一下網路通訊在MQ中的作用。
閱讀提示:文章篇幅較長,原始碼解析有點多,如果對細節不感興趣可以跳過原始碼部分,或者可以按照我上一篇部落格搭建原始碼閱讀環境。地址:RocketMQ原始碼除錯環境搭建
網路通訊
還是看圖說話比較好,在沒有MQ解耦的系統中,一次簡單的RPC系統呼叫如下所示:
在加入了MQ後,系統呼叫會如下所示:
原先的1次RPC會變成目前的“2”次RPC。這裡的2之所以打引號是因為從系統邊界來看是兩次RPC,但是對於MQ本身來說,會附加上很多MQ內部需要的網路呼叫。
如果再具體一點,就會如下所示:
上圖中的broker就是MQ中的服務端,負責訊息的轉儲和分發。一般來說MQ分為有broker和無broker兩種設計,比如kafka,actimemq,rabbitmq以及RocketMQ就是有broker設計,而類似zeroMQ和AKKA就是無broker設計,我個人覺得後者更偏向於一種帶有訊息正規化的程式設計模型(沒有深入研究,有偏差的地方歡迎指正討論)。
資料的傳輸在MQ中都是以這種RPC資料流的方式進行傳輸,所以一個良好的網路通訊設計在MQ中非常重要。
設計要素
那麼,對於一箇中間件的RPC網路通訊來說,到底需要哪些設計要素才能滿足它的需求?
我認為下面幾點是必須要滿足的。
- 編解碼處理(負責通訊中的編碼和解碼,序列化,通訊協議涉及等必要功能)
- 雙向訊息處理(包括同步或非同步,MQ中有非同步訊息的功能)
- 單向訊息處理(一般指心跳訊息或者註冊訊息這樣的型別)
- 業務層處理(如何實現業務上對訊息的分類處理?如何構建出一個完善的業務處理機制?)
具體架構實現
老規矩,看圖說話,先上一張UML圖先(RocketMQ和大多數中介軟體一樣,使用了著名的Netty作為網路通訊框架):
RemotingService:以RemotingService為最上層介面,提供了
三個介面:
void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook);
NettyRemotingAbstract:netty處理的抽象類,封裝了netty處理的公共方法,比如下面這一段對訊息的總體處理:
RemotingClient/RemotingSever:這兩個介面繼承了最上層介面,同時提供了client和server所必需的方法,下面這個就是RemotingClient的方法:
public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
public boolean isChannelWriteable(final String addr);
BrokerOuterAPI:是broker和別的模組通訊的類,封裝了NettyRemotingClient。
MQClientImpl:是客戶端和broker與nameserver通訊的類,也封裝了NettyRemotingClient。
UML圖下方紫色區域都是對編碼解碼和事件處理的一些類,沒有全部羅列出來,其中RemotingCommand非常重要,是所有傳輸資料的封裝,下面會詳細講解。
協議設計與編碼解碼
作為網路通訊模組,協議設計和編碼解碼是最基礎和重要的,在介紹具體的網路協議設計前,我覺得應該把RemotingCommand的具體內容詳細說明一下,這個類是傳輸過程中對所有資料的封裝,不但包含了所有的資料結構,還包含了編碼解碼操作。
RemotingCommand:
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND rpc型別的標註,一種是普通的RPC請求
private static final int RPC_ONEWAY = 1; // 0, 這種ONEWAY 是指單向RPC,比如心跳包
private static final Map<Class<? extends CommandCustomHeader>, Field[]> clazzFieldsCache =
new HashMap<Class<? extends CommandCustomHeader>, Field[]>();//**CommandCustomHader**是所有headerData都要實現的介面,後面的Field[]就是解析header所對應的成員屬性,所以這個map就是解析時候的欄位快取,下面兩個map也是分別對應類名快取和註解快取。
private static final Map<Class, String> canonicalNameCache = new HashMap<Class, String>();
// 1, RESPONSE_COMMAND
private static final Map<Field, Annotation> notNullAnnotationCache = new HashMap<Field, Annotation>();
private static AtomicInteger requestId = new AtomicInteger(0);//這裡的requestId是RPC請求的序號,每次請求的時候都會increment一下,同時後面會講到的responseTable會用這個requestId作為key。
private int code;//這裡的code是用來區分request型別的
private LanguageCode language = LanguageCode.JAVA;//區分語言種類
private int version = 0;//RPC版本號
private int opaque = requestId.getAndIncrement();//這裡的opaque就是requestId
private int flag = 0;//區分是普通RPC還是onewayRPC得標誌
private String remark;//標註資訊
private HashMap<String, String> extFields;//存放本次RPC通訊中所有的extFeilds,extFeilds其實就可以理解成本次通訊的包頭資料
private transient CommandCustomHeader customHeader; //包頭資料,注意transient標記,不會被序列化
private transient byte[] body; //body資料,注意transient標記,不會被序列化
重要的成員變數都在上面了,給大家展現一下一次心跳註冊的報文:
[
code=103,//這裡的103對應的code就是broker向nameserver註冊自己的訊息
language=JAVA,
version=137,
opaque=58,//這個就是requestId
flag(B)=0,
remark=null,
extFields={
brokerId=0,
clusterName=DefaultCluster,
brokerAddr=125.81.59.113: 10911,
haServerAddr=125.81.59.113: 10912,
brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON
]
現在再來看一下協議設計:
上面這個截圖是RocketMQ程式碼裡自帶的註釋,傳輸內容主要分為4部分內容:
- 1、length(總長度,用4個位元組儲存)
- 2、header length (包頭長度)
- 3、header data(包頭資料)
- 4、body data(資料包資料)
OK,來具體看一下具體怎麼實現的,先看encode編碼:
headerEncode()首先將extField放入到這個物件的feildMap中(上面有寫過),然後將這個RemotingCommand序列化成byte[]位元組陣列,序列化使用的是阿里的fastJson:
其中的markProtocolType方法是將RPC型別和headerData長度編碼,放到一個byte[4]陣列中,實現的比較巧妙。
(當然也有可能是我接觸位運算很少,可能C語言裡面這種設計很常見)
OK,說完了編碼,再看看解碼:
解碼就是編碼的一個逆向流程:
現在編碼解碼就先到此為止,下面來看RPC的事件處理
RPC事件處理和資料流轉
NettyRemotingAbstract這個抽象類包含了很多公共資料處理,也包含了很多重要的資料結構,先介紹一下NettyRemotingAbstract的成員屬性。
NetttyRemotingAbstract:
//單向RPC訊號量,控制執行緒個數 protected final Semaphore semaphoreOneway; //非同步RPC訊號量,控制執行緒個數 protected final Semaphore semaphoreAsync; //responseTable,存放非同步請求的ResponseFuture protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256); //processorTable,存放註冊的processor,key是request Code,對應的是<processor,ExecutorService//執行緒池(一般使用的是共用的public processor)> protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); //netty事件處理內部類 protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter(); //預設的事件處理器,處理一些公共的訊息 protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { this.semaphoreOneway = new Semaphore(permitsOneway, true); this.semaphoreAsync = new Semaphore(permitsAsync, true); } //新增ChannelEvent介面 public abstract ChannelEventListener getChannelEventListener(); //新增NettyEvent事件 public void putNettyEvent(final NettyEvent event) { this.nettyEventExecuter.putNettyEvent(event); }
先講一下業務層的事件處理:
首先上述的processorTable負責儲存各個requestCode對應的processor,RemotingServer和RemotingClient中都有
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
這個介面,在註冊的時候會把processor和對應的request Code 裝載進processorTable中,在處理事件時就會去processorTable中去取對應的processor:
然後再講一下對幾種通訊方式的處理:
- 同步訊息(一次broker和NameServer通訊為例子):
1、通過呼叫invekeSyncImpl來發出請求,此時會建立一個responseFuture,並put到responseTable中,key是opaque,
2、請求到達server端後會進行對應處理,然後回寫response
3、根據opaque取出對應的responseFuture並把response放進去。
- 單向訊息:由於不需要回復,不需要建立responseFuture
非同步訊息:
在同步訊息的基礎上會檢測remotingCommand是否有回撥函式,如果有會執行回撥函式。
Tips:
小的設計技巧:
1、通過countDownLatch來控制等待網路通訊時間
2、通過兩個AtomicBoolean的CAS方法來控制RPC只執行了一次
Netty通訊層設計
- 編解碼處理
這裡的編碼解碼分別使用了
Netty裡面的MessageToByteEncoder和LengthFieldBasedFrameDecoder進行編碼解碼,這一對工具是比較常用的編解碼方式,具體實現和原理我這裡就不細說了。
- Netty事件處理
Netty事件處理最重要的兩個類就是
- NettyConnetManageHandler
- NettyEventExecuter
前者繼承自ChannelDuplexHandler,可以監控connect,disconnect,close等事件,每個事件過來存入NettyEventExecuter的佇列裡面。
NettyEventExecuter是一個執行緒,不停地從佇列裡面取出事件進行相應處理。
class NettyEventExecuter extends ServiceThread {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
public void putNettyEvent(final NettyEvent event) {
System.out.println("put event: "+event.getType());
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
plog.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@Override
public void run() {
plog.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStoped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
plog.warn(this.getServiceName() + " service has exception. ", e);
}
}
plog.info(this.getServiceName() + " service end");
}
結尾總結
詳細閱讀RocketMQ的過程中收穫了很多關於網路通訊設計的知識,其中對於netty的使用和訊息的設計讓我收益很多,當然對於MQ來說,網路設計並沒有真正的RPC框架那麼複雜,不需要考慮很多第三方呼叫問題和併發量問題,因為瓶頸一般不會卡在網路這一層,不管怎麼說還是學習到了很多。由於本人水平有限,如果讀者有什麼問題或者文章哪裡有錯誤的地方,歡迎大家指正和探討,我的郵箱 [email protected]。
相關推薦
RocketMQ原始碼學習---網路通訊篇
序言 本篇文章主要討論RocketMQ中網路通訊的具體實現。對於一個訊息中介軟體來說,它的作用一般會有 解耦 限流 訊息儲存 訊息轉發或者廣播 這幾個功能。這幾個功能都離不開網路通訊,下面先從巨集觀角度講一下網路通訊在MQ中的作用。 閱讀提示:文章篇
RocketMQ原始碼學習-通訊與協議
從github clone 最新原始碼,結構如下: 本篇文章要講的通訊與協議部分的原始碼在remoting模組下。remoting模組是複雜網路通訊的模組,為其他需要網路通訊的模組所依賴。在這個模組中,RocketMQ定義了基礎的通訊協議,結合Netty,使
OkHttp原始碼學習——請求響應篇
文章目錄 連結伺服器 代理 dns解析地址 socket連結 傳送請求 構建響應物件 在上篇 OKHttp原始碼學習——快取篇中主要是使用dispatch對同步和非同
c++面試題(網路通訊篇)
●TCP和UDP的區別 UDP 與 TCP 的主要區別在於 UDP 不一定提供可靠的資料傳輸,它不能保證資料準確無誤地到達,不過UDP在許多方面非常有效。當程式是要儘快地傳輸儘可能多的資訊時,可以使用 UDP。TCP它是通過三次握手建立的連線,它在兩個服務之間始終保持一個連
Android NDK網路通訊篇(五)之本地通訊篇
Android NDK網路通訊篇(五) 本地通訊篇 前言 在同一個裝置或者同一個APP裡面,我們可以通過LocalSocket來實現本地通訊,比如可以用Java程式碼實現一個本地通訊的C/S架構的程式,也可以用Java程式碼實現客戶端程式碼,用原生程式碼實現服務端程式碼,本
【java面試】網路通訊篇
1.說一下HTTP協議HTTP協議是超文字傳輸協議,屬於應用層協議,規定了客戶端與服務端傳輸資料的格式;它是無狀態的,對於前面傳送過的資訊沒有記錄;請求方式有GET,POST,HEAD,PUT,DELE
QT-網路通訊篇-獲取本機資訊
今天carry對QT網路通訊的http有了一定的理解,可能瞭解的非常基礎,希望大家多提點意見哈! QT中的網路程式設計是由Qt Network提供的,它提供了許多類來實現網路應用中的各種功能,下面簡單實現一下獲取本機資訊
RocketMQ 原始碼學習筆記————Producer 是怎麼將訊息傳送至 Broker 的?
目錄 RocketMQ 原始碼學習筆記————Producer 是怎麼將訊息傳送至 Broker 的? 前言 專案結構 rocketmq-client 模組 DefaultMQProducer
【RocketMQ原始碼學習】- 3. Client 傳送同步訊息
本文較長,程式碼後面給了方法簡圖,希望給你幫助 傳送的方式 同步傳送 非同步傳送 訊息的型別 普通訊息 順序訊息 事務訊息 傳送同步訊息的時序圖 為了防止讀者朋友嫌煩,可以看下時序圖,後面我也會給出方法的簡圖 原始碼示例【傳送同步訊息】 呼叫Defau
【RocketMQ原始碼學習】- 5. 訊息儲存機制
前言 面試官:你瞭解RocketMQ是如何儲存訊息的嗎?我:額,,,你等下,我看下這篇文字, (逃 由於這部分內容優點多,所以請哥哥姐姐們自備茶水,歡迎留言! RocketMQ儲存設計是高可用和高效能的保證, 利用磁碟儲存來滿足海量堆積能力。Kafka單機在topic數量在100+的時候,效能會
無線通訊網路學習之LTE網路架構篇(20141208)
今天來學習一下LTE的網路架構: 1.LTE網路架構簡化了既有通訊網路架構,並可以與其他IP網路進行通訊的無縫整合,使其成為扁平化的全IP網路架構(Falt-All-IP); 2.改網路主要由EPC(核心網)與E-UTRAN組成,通過其他傳輸介質接入其他通訊網路,如下圖所示
RocketMQ Consumer 負載均衡演算法原始碼學習 -- AllocateMessageQueueConsistentHash
RocketMQ 提供了一致性hash 演算法來做Consumer 和 MessageQueue的負載均衡。 原始碼中一致性hash 環的實現是很優秀的,我們一步一步分析。 一個Hash環包含多個節點, 我們用 MyNode 去封裝節點, 方法 getKey() 封裝獲取節點的
RocketMQ NameServer模組 原始碼學習
RocketMQ namesrv 模組中 用到了 MixAll 類, 其中有一個 properties2Object(Properties, Object) 通用方法,把properties 轉換成 簡單的 POJO object。 你可以進一步擴充套件:從.properties
網路通訊預備篇:進位制計數
只要記住你的名字,不管你在世界的哪個地方,我一定會去見你。 ——電影《你的名字》 在我們的日常生活中,每個人的名字對應一個唯一的身(敏)份(感)證(詞)號,在Internet上也是一
OKHttp原始碼學習——快取篇
OKHttp真實呼叫請求的類是RealCall Dispatcher該類是作為請求分發 //非同步請求最多的請求個數 private int maxRequests = 64; //同一個host最多非同步請求的個數 private int maxRequests
深度學習網路篇——ZFNet(Part3 ZFNet的實驗環節)
上篇ZFNet的文章中我們簡單的分享了一下ZFNet的網路結構和訓練細節,這篇文章將分享ZFNet論文上的實驗環節。ZFNet做了很多巧妙的實驗,從這邊文章中也可以看到未來深度網路發展方向的蛛絲馬跡。 一、Experiments實驗 1.ImageNet 2012 該資料集由1.3M
深度學習網路篇——ZFNet(Part2 ZFNet的訓練細節)
上篇文章中我們介紹了ZFNet的發展歷程和一些演算法小心機,在這篇文章中我們將分享一下ZFNet的訓練細節!Come on!!!Baby!!! 一、ZFNet訓練細節 【AlexNet和ZFNet的區別】 1.AlexNet中使用2個GPU運的稀疏連線;在ZFNet中被單GPU密集連
深度學習網路篇——ZFNet(Part1 從AlexNet到ZFNet)
一、上回說到的 AlexNet 請各位看官們參見部落格之前的文章: 深度學習網路篇——AlexNet https://blog.csdn.net/weixin_43624538/article/details/83988998 1)AlexNet效能舉世矚目 top-1和top-5
深度學習網路篇——AlexNet
作為一個機器學習剛入門的小學生,今天和大家分享的是一篇比較經典的論文《ImageNet Classification with Deep Convolutional Neural Networks》。只是我們在學習後的知識分享和總結,有不周到的地方還請各位大大們指正。 簡要概括 Al
RocketMQ原理學習--RocketMQ原始碼執行
最近打算對RocketMQ相關的知識和原始碼進行學習一下,首先能把原始碼匯入及執行能比較方便我們通過跟蹤原始碼進行相關知識學習。 一、工程匯入 git地址:https://github.com/apache/rocketmq.git 直接以maven工程的