RocketMQ底層通訊機制
責編 | 姜新城
第 714篇技術好文:4350字 |11分鐘閱讀
分散式系統各個角色間的通訊效率很關鍵,通訊效率的高低直接影響系統性能,基於Socket實現一個高效的Tcp通訊協議是個很有挑戰的事情,本節說明RocketMQ是如何解決這個問題的
01
Remoting模組
_____
RocketMQ的通訊相關程式碼在Remoting模組裡,先來看看主要類結構。
圖1-1 Remoting模組的類繼承關係
RemotingService為最上層介面,定義了三個方法:
void start();
void shutdown();
void registerRPCHook(RPCHookrpcHook);
RemotingClient,RemotingServer繼承RemotingService介面, 並增加了自己特有的方法。
程式碼清單1-1RemotingClient主要函式定義
1void registerProcessor(final int requestCode, finalNettyRequestProcessor processor,final ExecutorService executor);
2RemotingCommand invokeSync(final String addr, final RemotingCommandrequest, final long timeoutMillis) ;
3void invokeAsync(final String addr, final RemotingCommand request,final long timeoutMillis,final InvokeCallback invokeCallback);
4void invokeOneway(final String addr, final RemotingCommand request,final long timeoutMillis);
5void updateNameServerAddressList(final List<String> addrs);
然後看看具體的實現類,
通過上面的封裝,RocketMQ各個模組間的通訊,可以通過傳送統一格式的自定義訊息(RemotingCommand)來完成的,各個模組間的通訊實現簡潔明瞭。
比如NameServer模組中,NameServerController有個remotingServer變數,NameServer在啟動時初始化好各個變數,然後啟動remotingServer即可,剩下NameServer要做的是專心實現好處理RemotingCommand的邏輯。
程式碼清單1-2NameServer處理主流程程式碼
1@Override
2public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
3 if (log.isDebugEnabled()){
4 log.debug("receive request, {} {} {}",
5 request.getCode(),
6 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
7 request);
8 }
9 switch (request.getCode()){
10 caseRequestCode.PUT_KV_CONFIG:
11 returnthis.putKVConfig(ctx, request);
12 caseRequestCode.GET_KV_CONFIG:
13 returnthis.getKVConfig(ctx, request);
14 caseRequestCode.DELETE_KV_CONFIG:
15 returnthis.deleteKVConfig(ctx, request);
16 caseRequestCode.REGISTER_BROKER:
17 VersionbrokerVersion = MQVersion.value2Version(request.getVersion());
18 if (brokerVersion.ordinal()>= MQVersion.Version.V3_0_11.ordinal()) {
19 returnthis.registerBrokerWithFilterServer(ctx, request);
20 } else {
21 returnthis.registerBroker(ctx, request);
22 }
23 caseRequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
24 returnthis.getHasUnitSubUnUnitTopicList(ctx, request);
25 caseRequestCode.UPDATE_NAMESRV_CONFIG:
26 returnthis.updateConfig(ctx, request);
27 caseRequestCode.GET_NAMESRV_CONFIG:
28 returnthis.getConfig(ctx, request);
29 default:
30 break;
31 }
32 return null;
33}
在Consumer的原始碼中,獲取訊息的底層的通訊部分也是傳送一個RemotingComand 請求,返回的response也是個RemotingCommand型別。
程式碼清單1-3Consumer請求訊息底層實現程式碼
1private PullResult pullMessageSync(//
2 final String addr, // 1
3 final RemotingCommandrequest, // 2
4 final long timeoutMillis//3
5) throws RemotingException, InterruptedException, MQBrokerException{
6 RemotingCommand response =this.remotingClient.invokeSync(addr, request, timeoutMillis);
7 assert response != null;
8 returnthis.processPullResponse(response);
9}
從原始碼中可以看出,RocketMQ中複雜的通訊過程,被RemotingCommand統一起來,大部分的邏輯都是通過傳送Command,接受並處理Command完成。
02
協議設計和編解碼
_____
RocketMQ自己定義了一個通訊協議,使得模組間傳輸的二進位制訊息和有意義的內容之間互相轉換。協議格式如圖1-2所示。
圖1-2 RocketMQ的通訊協議
(1)第一部分是大端4個位元組整數,值等於第二,三,四部分長度總和
(2)第二部分是大端4個位元組整數,值等於第三部分的長度
(3)第三部分是通過json 序列化的資料
(4)第四部分是通過應用自定義二進位制序列化的資料
訊息的解碼過程在RomotingCommand的decode函式裡。
程式碼清單1-4訊息解碼函式
1public static RemotingCommand decode(final ByteBuffer byteBuffer) {
2 int length =byteBuffer.limit();
3 int oriHeaderLen =byteBuffer.getInt();
4 int headerLength =getHeaderLength(oriHeaderLen);
5 byte[] headerData = newbyte[headerLength];
6 byteBuffer.get(headerData);
7 RemotingCommand cmd =headerDecode(headerData, getProtocolType(oriHeaderLen));
8 int bodyLength = length - 4 - headerLength;
9 byte[] bodyData = null;
10 if (bodyLength > 0) {
11 bodyData = newbyte[bodyLength];
12 byteBuffer.get(bodyData);
13 }
14 cmd.body = bodyData;
15 return cmd;
16}
對應的訊息編碼過程在RemotingCommand的encode函式中。
程式碼清單1-5訊息編碼函式
1public ByteBuffer encode() {
2 // 1> header lengthsize
3 int length = 4;
4 // 2> header datalength
5 byte[] headerData =this.headerEncode();
6 length +=headerData.length;
7 // 3> body data length
8 if (this.body != null) {
9 length += body.length;
10 }
11 ByteBuffer result =ByteBuffer.allocate(4 + length);
12 // length
13 result.putInt(length);
14 // header length
15 result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));
16 // header data
17 result.put(headerData);
18 // body data;
19 if (this.body != null) {
20 result.put(this.body);
21 }
22 result.flip();
23 return result;
24}
03
Netty庫
_____
RocketMQ是基於Netty庫來完成RemotingServer和RemotingClient具體的通訊實現的,Netty是個事件驅動的網路程式設計框架,它遮蔽了Java Socket,Nio等複雜細節,使用者只需用好Netty,就可以實現一個網路程式設計專家+併發程式設計專家水平的Server、Client網路程式。應用Netty有一定的門檻,需要了解它的EventLoopGroup,Channel,Handler模型以及各種具體的配置。RocketMQ利用Netty實現的通訊類是NettyRemotingServer和NettyRemotingClient,使用者也可以參考這兩個類的實現來學習使用Netty。
相關推薦
STL及一些容器底層實現機制
失效 list容器 容量 較高的 浪費 復制 處理 跳轉 strong 1、vector容器 vector的數據安排以及操作方式,與數組類似。倆這唯一的區別就是空間的運用靈活性。數組是靜態空間,一旦配置了就不能改變,vector是動態數組。在堆上分配內存。vector是動態
Spark內建框架rpc通訊機制及RpcEnv基礎設施-Spark商業環境實戰
本套系列部落格從真實商業環境抽取案例進行總結和分享,並給出Spark原始碼解讀及商業實戰指導,請持續關注本套部落格。版權宣告:本套Spark原始碼解讀及商業實戰歸作者(秦凱新)所有,禁止轉載,歡迎學習。 Spark商業環境實戰及調優進階系列 Spark商業環境實戰-Spark內建框架rpc通訊機制及
Rabbitmq中的RPC通訊機制
具體工作機制: Our RPC will work like this: When the Client starts up, it creates an anonymous exclusive callback queue. For an RPC request, the
通訊機制解決生產者消費者問題
通訊機制解決生產者消費者問題實現高併發 生產者
深入剖析 Web 伺服器與 PHP 應用之間的通訊機制 - 掌握 CGI 和 FastCGI 協議的執行原理
本文首發於 深入剖析 Web 伺服器與 PHP 應用之間的通訊機制 - 掌握 CGI 和 FastCGI 協議的執行原理,轉載請註明出處! 身為一名使用 PHP 語言開發後端服務的程式猿,我們每天都和 PHP 以及 Web 伺服器產生無數次的親密接觸。得益於它們,我們才能
Java多執行緒系列---“基礎篇”14之 wait,sleep,join,yield,park,unpark,notify等通訊機制對比
1. 執行緒讓步: yield() yield()的作用是讓步。它能讓當前執行緒由“執行狀態”進入到“就緒狀態”,從而讓其它具有相同優先順序的等待執行緒獲取執行權;但是,並不能保證在當前執行緒呼叫yield()之後,其它具有相同優先順序的執行緒就一定能獲得執行權;也有可能是當前執行緒又進入到“執行狀態”繼續
C++:通過C++程式碼簡單理解程序間的通訊機制:共享記憶體
下面用共享對映檔案的方式實現程序間通訊,程式碼可以執行。 一、淺理解 每個程序有自己獨立的空間,一個程序無法訪問其他程序的資料。就好像兩個是互不干涉的個體,想讓它們進行通訊(交換資料),就必須有一段它們都可以訪問到的空間,作為中間介質。在計算機中,可以存放資料的地方分為記憶體和硬
區塊鏈100講:EOS通訊機制分析
客戶端和伺服器端的通訊採用RESTful軟體架構風格,伺服器端的每個資源對應一個唯一的URL地址,客戶端將URL地址封裝成http請求傳送到伺服器端,請求對應的資源或者執行相應操作。 1 客戶端傳送訊息流程 以轉賬為例,說明EOS訊息處理流程。通過cleos客戶端發起
lambda表示式底層處理機制
為了支援函數語言程式設計,Java 8引入了Lambda表示式,那麼在Java 8中到底是如何實現Lambda表示式的呢? Lambda表示式經過編譯之後,到底會生成什麼東西呢? 在沒有深入分析前,讓我們先想一想,Java 8中每一個Lambda表示式必
Android多執行緒通訊機制
掌握Android的多執行緒通訊機制,我們首先應該掌握Android中程序與執行緒是什麼。 1. 程序 在Android中,一個應用程式就是一個獨立的程序(應用執行在一個獨立的環境中,可以避免其他應用程式/程序的干擾)。一般來說,當我們啟動一個應用程式時,系統會建立一個程序(從Zygote中
objective-c程式碼轉c++程式碼,瞭解底層實現機制
1、開啟終端,輸入 clang -rewrite-objc main.m 2、有時會遇到找不到系統庫的標頭檔案,如 解決辦法: 指定模擬器: xcrun -sdk iphonesimulator
從硬體底層通訊看http協議和https協議的資料流
嵌入式硬體與伺服器的通訊常常採用http協議或是https協議,實際上https協議就是http協議+SSL加密通訊,簡單點說就是把http協議的資料經過一定的演算法加密後傳送出去,接收方收到後再解密出來。http協議是執行在tcp協議的上一層,也就是說協
linux下程序、以及程序間的通訊機制
2.1程序基本概念 程序是Linux事務管理的基本單元,所有的程序均擁有自己獨立的處理環境和系統資源。程序的環境由當前系統狀態及其父程序資訊決定和組成。系統的第一個程序init由核心產生,以後所有的程序都是
QQ通訊機制(轉)
下面有4個基本的問答: 問題一:為什麼只要可以連上網際網路的計算機都可以用QQ相互建立通訊,而不需要固定IP?也就是這個QQ使用者端是怎樣找到另一個QQ使用者的,而使用者在每次使用時他可能用的是不同的計算機,有著不同的IP地址。 伺服器端不會以qq使用者端的ip作為唯一標識,伺服器端會以qq賬號作為
Android每天一個知識點+Demo—跨程序通訊機制AIDL入門
一 Why-為什麼要用AIDL 沙箱理念:在Android中,每個應用(Application)程式都執行在獨立的程序中,無法直接呼叫到其他應用的資源。當一個應用被執行時,一些操作是被限制的,比如訪問記憶體,訪問感測器等等。 好處:這也保證了當其中一個程式出現異常而不會影
Linux訊號----程序間非同步的通訊機制
訊號 一 、 概念和功能: 訊號實際上是一個軟中斷,用於通知程序發生了某些事,該如何處理。 實際上也歸為一類程序間通訊方式, 訊號的生命週期:訊號的產生-訊號的註冊-訊號的阻塞(/遮蔽)-訊號的登出-訊號的處理 二 、 檢視訊號: kill -l ,檢視l
Android安全/開發基礎--6--程序間通訊機制(IPC)
6-1、多程序 1、多程序分為兩種: 第一種情況是一個應用因為某些原因自身需要採用多執行緒模式來實現。 另一種情況是當前應用需要向其他應用獲取資料。 2、Android中的多程序模式: 通過給四大元件指定android:process屬性,可以開啟多程序模式,使
最高效的進(線)程間通訊機制--eventfd
我們常用的程序(執行緒)間通訊機制有管道,訊號,訊息佇列,訊號量,共享記憶體,socket等等,其中主要作為程序(執行緒)間通知/等待的有管道pipe和socketpair。執行緒還有特別的condition。 今天來看一個liunx較新的系統呼叫,它是從LINUX 2.6.27
Linux中程序間通訊機制----訊息佇列
一、什麼是訊息 訊息(message)是一個格式化的可變長的資訊單元。訊息機制允許由一個程序給其它任意的程序傳送一個訊息。當一個程序收到多個訊息時,可將它們排成一個訊息佇列。 1、訊息機制的資料結構 (1)訊息首部 記錄一些與訊息有關的資訊,如訊息的型別、大小、