RocketMQ原始碼詳解 | Producer篇 · 其二:訊息組成、傳送鏈路
概述
在上一節 RocketMQ原始碼詳解 | Producer篇 · 其一:Start,然後 Send 一條訊息 中,我們瞭解了 Producer 在傳送訊息的流程。這次我們再來具體下看訊息的構成與其傳送的鏈路
Message
在 RocketMQ 的使用中,Message 類是在傳送訊息時必須用到的,其中 body 即是訊息的存放位置,還有的就是訊息的 標識(flag) 和 屬性(properties)
public class Message { private String topic; private int flag; private Map<String, String> properties; private byte[] body; private String transactionId; }
訊息的標識(flag)
變數名 | 含義 |
---|---|
COMPRESSED_FLAG | 壓縮訊息。訊息為批量的時候,就會進行壓縮,預設使用5級的 zip |
MULTI_TAGS_FLAG | 有多個 tag。 |
TRANSACTION_NOT_TYPE | 事務為未知狀態。當 Broker 回查 Producer 的時候,如果為 Commit 應該提交,為 Rollback 應該回滾,為 Unknown 時應該繼續回查 |
TRANSACTION_PREPARED_TYPE | 事務的執行狀態。當前訊息是事務的一部分 |
TRANSACTION_COMMIT_TYPE | 事務的提交訊息。要求提交事務 |
TRANSACTION_ROLLBACK_TYPE | 事務的回滾訊息。要求回滾事務 |
BORNHOST_V6_FLAG | 生成該訊息的 host 是否 ipv6 的地址 |
STOREHOSTADDRESS_V6_FLAG | 持久化該訊息的 host 是否是 ipv6 的地址 |
訊息的屬性(properties)
而訊息的 properties 較多,只摘了一小段
屬性 | 含義 |
---|---|
KEYS | 訊息的 Key。伺服器會通過 key 設定索引,應用可以通過 Topic 和 Key 來查詢這條訊息以及被誰消費 |
TAGS | 訊息的子型別,可以根據 tag 選擇性消費 |
DELAY | 延遲訊息的延遲級別(共16級,理論上可以有18級) |
RETRY_TOPIC | 需要重試的 Topic(在 Broker 中會存放到 SCHEDULE_TOPIC_XXXX Topic,其中有 18 個 queue,對應 18 個重試延遲) |
REAL_TOPIC | 真實的 Topic (RocketMQ 經常使用更換目的 Topic 的"把戲",如事務訊息和延時訊息,這個欄位記錄了真正的 Topic) |
PGROUP | 生產者組 |
MAX_OFFSET\MIN_OFFSET | 在 pull 中的最大偏移量和最小偏移量 |
TRANSFER_FLAG | 事務有關標識 |
MSG_TYPE | 訊息型別,是否為回覆訊息 |
BUYER_ID | 嗯...買家ID? |
當然,這只是在生產者中的訊息的樣子,在 Broker 和消費者的眼中中,它是這樣的
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
private String brokerName;
private int queueId;
// 存檔的大小
private int storeSize;
// 在 ConsumerQueue 中的偏移量
private long queueOffset;
private int sysFlag;
// 訊息建立時間
private long bornTimestamp;
// 建立地址
private SocketAddress bornHost;
// 存檔時間
private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
// 在 commitLog 中的偏移量
private long commitLogOffset;
// crc 校驗
private int bodyCRC;
// 消費重試次數
private int reconsumeTimes;
private long preparedTransactionOffset;
}
訊息的包裝
那麼,producer 生成了這樣的訊息後,會直接將其發出去嗎?
讓我們繼續跟蹤上一篇沒講完的內容
MQClientAPIImpl#sendMessage
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
// 是否為 reply 訊息
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
// 是 smart 訊息則加上請求頭
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
/* -- pass -- */
在這裡,我們可以看到在這又加了層套娃(只保留了body),然後才傳送
RemotingCommand
的具體屬性如下
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private transient byte[] body;
我們還在他的方法中找到了一個叫 encode
的方法,並且返回的是 ByteBuffer
。因此這就是實際傳送的訊息。
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
訊息的結構
具體的訊息結構如下圖:
其中每個欄位在 Request 和 Response 中都有不同的含義
code
在 Request 中,為請求的操作碼
public class RequestCode {
// 傳送訊息
public static final int SEND_MESSAGE = 10;
// 拉取訊息
public static final int PULL_MESSAGE = 11;
// 查詢訊息(所在topic, 需要的 key, 最大數量, 開始偏移量, 結束偏移量)
public static final int QUERY_MESSAGE = 12;
// 查詢 Broker 偏移量(未使用)
public static final int QUERY_BROKER_OFFSET = 13;
/*
* 查詢消費者偏移量
* 消費者會將偏移量儲存在記憶體中,當使用主從架構時,會預設由主 Broker 負責讀於寫
* 為避免訊息堆積,堆積訊息超過指定的值時,會由從伺服器來接管讀,但會導致消費進度問題
* 所以主從消費進度的一致性由 從伺服器主動上報 和 消費者記憶體進度優先 來保證
*/
// 查詢消費者自己的偏移量
public static final int QUERY_CONSUMER_OFFSET = 14;
// 提交自己的偏移量
public static final int UPDATE_CONSUMER_OFFSET = 15;
// 建立或更新Topic
public static final int UPDATE_AND_CREATE_TOPIC = 17;
// 獲取所有的Topic資訊
public static final int GET_ALL_TOPIC_CONFIG = 21;
/* unused */
public static final int GET_TOPIC_CONFIG_LIST = 22;
public static final int GET_TOPIC_NAME_LIST = 23;
// 更新 Broker 配置
public static final int UPDATE_BROKER_CONFIG = 25;
// 獲取 Broker 配置
public static final int GET_BROKER_CONFIG = 26;
public static final int TRIGGER_DELETE_FILES = 27;
// 獲取 Broker 執行時資訊
public static final int GET_BROKER_RUNTIME_INFO = 28;
// 通過時間戳查詢偏移量
public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
// 獲取最大偏移量
public static final int GET_MAX_OFFSET = 30;
// 獲取最小偏移量
public static final int GET_MIN_OFFSET = 31;
//
public static final int GET_EARLIEST_MSG_STORETIME = 32;
/* 由 Broker 處理 */
// 通過訊息ID查詢訊息
public static final int VIEW_MESSAGE_BY_ID = 33;
// 心跳訊息
public static final int HEART_BEAT = 34;
// 登出客戶端
public static final int UNREGISTER_CLIENT = 35;
// 報告消費失敗(一段時間後重試) (Deprecated)
public static final int CONSUMER_SEND_MSG_BACK = 36;
// 事務結果(可能是 commit 或 rollback)
public static final int END_TRANSACTION = 37;
// 通過消費者組獲取消費者列表
public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
// 檢查事務狀態; Broker對於事務的未知狀態的回查操作
public static final int CHECK_TRANSACTION_STATE = 39;
// 通知消費者的ID已經被更改
public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
// 批量鎖定 Queue (rebalance使用)
public static final int LOCK_BATCH_MQ = 41;
// 解鎖 Queue
public static final int UNLOCK_BATCH_MQ = 42;
// 獲得該 Broker 上的所有的消費者偏移量
public static final int GET_ALL_CONSUMER_OFFSET = 43;
// 獲得延遲 Topic 上的偏移量
public static final int GET_ALL_DELAY_OFFSET = 45;
// 檢查客戶端配置
public static final int CHECK_CLIENT_CONFIG = 46;
// 更新或建立 ACL
public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
// 刪除 ACL 配置
public static final int DELETE_ACL_CONFIG = 51;
// 獲取 Broker 叢集的 ACL 資訊
public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
// 更新全域性白名單
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
// 獲取 Broker 叢集的 ACL 配置
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
/* NameServer 相關 */
// 放入鍵值配置
public static final int PUT_KV_CONFIG = 100;
// 獲取鍵值配置
public static final int GET_KV_CONFIG = 101;
// 刪除鍵值配置
public static final int DELETE_KV_CONFIG = 102;
// 註冊 Broker
public static final int REGISTER_BROKER = 103;
// 登出 Broker
public static final int UNREGISTER_BROKER = 104;
// 獲取指定 Topic 的路由資訊
public static final int GET_ROUTEINFO_BY_TOPIC = 105;
// 獲取 Broker 的叢集資訊
public static final int GET_BROKER_CLUSTER_INFO = 106;
// 更新或建立訂閱組
public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
// 獲取所有訂閱組的配置
public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
// 獲取 Topic 的度量指標
public static final int GET_TOPIC_STATS_INFO = 202;
// 獲取消費者線上列表(rpc)
public static final int GET_CONSUMER_CONNECTION_LIST = 203;
// 獲取生產者線上列表
public static final int GET_PRODUCER_CONNECTION_LIST = 204;
public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
// 從 NameSrv 獲取所有 Topic
public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
// 刪除訂閱組
public static final int DELETE_SUBSCRIPTIONGROUP = 207;
// 獲取消費者的度量指標
public static final int GET_CONSUME_STATS = 208;
public static final int SUSPEND_CONSUMER = 209;
public static final int RESUME_CONSUMER = 210;
public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
public static final int WHO_CONSUME_THE_MESSAGE = 214;
// 刪除 Broker 中的 Topic
public static final int DELETE_TOPIC_IN_BROKER = 215;
// 刪除 NameSrv 中的 Topic
public static final int DELETE_TOPIC_IN_NAMESRV = 216;
// 獲取鍵值列表
public static final int GET_KVLIST_BY_NAMESPACE = 219;
// 重置消費者的消費進度
public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
// 從消費者中獲取消費者的度量指標
public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
// 讓 Broker 重置消費進度
public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
// 讓 Broker 更新消費者的度量資訊
public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
// 查詢訊息被誰消費
public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
// 從叢集中獲取 Topic
public static final int GET_TOPICS_BY_CLUSTER = 224;
// 註冊過濾器伺服器
public static final int REGISTER_FILTER_SERVER = 301;
// 註冊訊息過濾類
public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
// 查詢消費時間
public static final int QUERY_CONSUME_TIME_SPAN = 303;
// 從 NameSrv 中獲取系統Topic
public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
// 從 Broker 中獲取系統Topic
public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
// 清理過期的消費佇列
public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
// 獲取 Consumer 的執行時資訊
public static final int GET_CONSUMER_RUNNING_INFO = 307;
// 查詢修正偏移量
public static final int QUERY_CORRECTION_OFFSET = 308;
// 直接消費訊息
public static final int CONSUME_MESSAGE_DIRECTLY = 309;
// 傳送訊息(v2),優化網路資料包
public static final int SEND_MESSAGE_V2 = 310;
// 單元化相關 topic
public static final int GET_UNIT_TOPIC_LIST = 311;
// 獲取含有單元化訂閱組的 Topic 列表
public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
// 獲取含有單元化訂閱組的非單元化 Topic 列表
public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
// 克隆消費進度
public static final int CLONE_GROUP_OFFSET = 314;
// 查詢 Broker 上的度量資訊
public static final int VIEW_BROKER_STATS_DATA = 315;
// 清理未使用的 Topic
public static final int CLEAN_UNUSED_TOPIC = 316;
// 獲取 broker 上的有關消費的度量資訊
public static final int GET_BROKER_CONSUME_STATS = 317;
/* update the config of name server */
public static final int UPDATE_NAMESRV_CONFIG = 318;
/* get config from name server */
public static final int GET_NAMESRV_CONFIG = 319;
// 傳送批量訊息
public static final int SEND_BATCH_MESSAGE = 320;
// 查詢消費的 Queue
public static final int QUERY_CONSUME_QUEUE = 321;
// 查詢資料版本
public static final int QUERY_DATA_VERSION = 322;
/* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
// 回送訊息
public static final int SEND_REPLY_MESSAGE = 324;
public static final int SEND_REPLY_MESSAGE_V2 = 325;
// push回送訊息到客戶端
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}
在 Response 中,為響應碼
public class ResponseCode extends RemotingSysResponseCode {
// 重新整理到磁碟超時
public static final int FLUSH_DISK_TIMEOUT = 10;
// 從節點不可達
public static final int SLAVE_NOT_AVAILABLE = 11;
// 從節點刷盤超時
public static final int FLUSH_SLAVE_TIMEOUT = 12;
// 非法的訊息結構
public static final int MESSAGE_ILLEGAL = 13;
// 服務不可用
public static final int SERVICE_NOT_AVAILABLE = 14;
// 版本不支援
public static final int VERSION_NOT_SUPPORTED = 15;
// 未授權的
public static final int NO_PERMISSION = 16;
// Topic 不存在
public static final int TOPIC_NOT_EXIST = 17;
// Topic 已經存在
public static final int TOPIC_EXIST_ALREADY = 18;
// 要拉取的偏移量不存在
public static final int PULL_NOT_FOUND = 19;
// 立刻重新拉取
public static final int PULL_RETRY_IMMEDIATELY = 20;
// 重定向拉取的偏移量
public static final int PULL_OFFSET_MOVED = 21;
// 不存在的佇列
public static final int QUERY_NOT_FOUND = 22;
// 訂閱的 url 解析失敗
public static final int SUBSCRIPTION_PARSE_FAILED = 23;
// 目標訂閱不存在
public static final int SUBSCRIPTION_NOT_EXIST = 24;
// 訂閱不是最新的
public static final int SUBSCRIPTION_NOT_LATEST = 25;
// 訂閱組不存在
public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
// 訂閱的資料不存在 (tag表示式異常)
public static final int FILTER_DATA_NOT_EXIST = 27;
// 該 Broker 上訂閱的資料不是最新的
public static final int FILTER_DATA_NOT_LATEST = 28;
// 事務應該提交
public static final int TRANSACTION_SHOULD_COMMIT = 200;
// 事務應該回滾
public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
// 事務狀態位置
public static final int TRANSACTION_STATE_UNKNOW = 202;
// 事務狀態Group錯誤
public static final int TRANSACTION_STATE_GROUP_WRONG = 203;
// 買家ID不存在
public static final int NO_BUYER_ID = 204;
public static final int NOT_IN_CURRENT_UNIT = 205;
// 消費者不線上(rpc)
public static final int CONSUMER_NOT_ONLINE = 206;
// 消費超時
public static final int CONSUME_MSG_TIMEOUT = 207;
// 訊息不存在
public static final int NO_MESSAGE = 208;
// 更新或建立 ACL 配置失敗
public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
// 刪除 ACL 配置失敗
public static final int DELETE_ACL_CONFIG_FAILED = 210;
// 更新全域性白名單地址失敗
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
}
lang
欄位為訊息發起方編碼語言,這裡預設為 java
private LanguageCode language = LanguageCode.JAVA;
version
訊息發起方的程式版本
opaque
該欄位是為了在同一連線上標識不同的請求,在響應的時候能夠回撥對應的函式( rocketmq 的傳送使用了 TCP 連線複用)
remark
在 Reqeust 中,用於傳輸自定義文字
在 Response 中,用於傳輸錯誤的原因
ext
傳輸自定義的訊息頭
訊息的傳送
在知道訊息長啥樣後,就可以繼續看傳送程式碼了
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
NettyRemotingClient#invokeOneway
我們先來看最簡單的Oneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 建立 Channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
// 使用建立好的連線傳送
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
以上可以大致抽象為兩個操作:獲取或建立TCP連線、通過連線傳送資料,同時一旦發生異常則關閉連線
NettyRemotingClient#getAndCreateChannel
先看第一個操作
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
// 地址為空則說明要獲取的是NameServer的地址
if (null == addr) {
return getAndCreateNameserverChannel();
}
// 嘗試從快取中獲取
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// 沒有或未就緒則新建連線
return this.createChannel(addr);
}
可以看出,這裡是由一個 ChannelTable 來維護所有的連線,而 ChannelTable 又是由 NettyRemotingClient
維護,即其是在 JVM 上的全域性共享例項。
然後再具體檢視建立的方法,可以發現 Channel 最終是由客戶端的 Bootstrap 非同步建立
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// 連線的建立是序列的
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 雙重校驗保證連線確實沒被建立
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
// 連線建立完成
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
// 建立過但失敗了
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
// 實際上的連線建立
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
// 阻塞直到建立完成
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
return null;
NettyRemotingAbstract#invokeOnewayImpl
然後接著看第二個操作:通過連線傳送資料
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 在請求頭上的 flag 標記為 oneway 請求
request.markOnewayRPC();
// 獲取訊號量,保證不會系統不會承受過多請求
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 真正傳送完成後,釋放鎖
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 超出請求數
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
// 超時
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
這塊比較簡單,在獲取傳送 oneway 的訊號量後呼叫 Channel 的 writeAndFlush
方法傳送,傳送完成後釋放
MQClientAPIImpl#sendMessageSync
然後來看同步的傳送
// 傳送請求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
// 處理響應
return this.processSendResponse(brokerName, msg, response,addr);
其中在 NettyRemotingClient#invokeSync
做的事和 oneway 傳送差不多,都是建立或獲取 Channel 然後處理鉤子然後呼叫父類的響應實現。
所以我們直接來看父類是咋做的
NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
// 傳送失敗,回填 responseFuture 並在 responseTable 移除其
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 使用 countDownLatch 來等待響應到達
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
發現和 oneway 的傳送的區別了嗎?其中最大的區別有兩個:
- 在 oneway 中出現的訊號量限流不見了
- 出現了 responseTable 來管理所有的 responseFuture
我們發現了以下定義
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
由之前介紹到的 opaque 可以知道,這裡對 opaque 和 responseFuture 做了對映,當響應到來時,可以根據 opaque 處理對應的 responseFuture。而流控的消失也是可以理解的,畢竟同步傳送會阻塞整個執行緒,所以在傳送方來做流控是不合理的
最後傳送完成後使用 processSendResponse
處理響應後返回傳送結果
NettyRemotingAbstract#invokeAsyncImpl
最後看非同步的傳送
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
// 訊號量流控
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
// 發生了任何阻塞操作後都要檢查是否超時...
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
這裡和同步傳送區別不大,主要還是用了訊號量做流控,且不在 responseFuture 使用 countDownLatch 阻塞
Netty 元件
總所周知,在 Channel 寫入訊息後,就會進入 Pipeline 的尾部,並往出站的方向流出,接下來就看看在出站的過程中又是怎樣做的
NettyRemotingAbstract
在 rocketMQ 的 remoting 模組下有一個 netty 包,那裡就是 RPC 呼叫的處理位置。
而在 NettyRemotingAbstract 下,我們就已經看到了有三個重要的方法 invokeOnewayImpl、invokeAsyncImpl、invokeSyncImpl
同時這個類的子類有 NettyRemotingClient 和 NettyRemotingServer,我們先來看和 Producer 有關的部分
public abstract class NettyRemotingAbstract {
/* oneway請求的訊號量 */
protected final Semaphore semaphoreOneway;
/* async請求的訊號量 */
protected final Semaphore semaphoreAsync;
/* 快取所有進行中的請求(因為請求是並行的,要對對應的請求做出對應響應), */
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
/* 對請求碼進行對應的處理 */
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
/* Executor to feed netty events to user defined {@link ChannelEventListener}. */
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/* 預設請求處理器 */
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/* SSL上下文 {@link SslHandler}. */
protected volatile SslContext sslContext;
/* rpc hooks */
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
/**
* Custom channel event listener.
*
* @return custom channel event listener if defined; null otherwise.
*/
public abstract ChannelEventListener getChannelEventListener();
}
request 響應部分和 response 響應部分
/**
* 對請求進行響應
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
// 該請求可能是一個 request, 也可能是自己發出一個請求的 response
if (cmd != null) {
// 通過請求頭上的 flag 就能判斷
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
/**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
// 如果不是 oneway 請求的話,則需要進行響應
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 是否觸發流控
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 交由對應的處理器處理
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 沒有對應的響應方式且不存在預設響應器
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
/**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
// 處理回撥方法
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
// 如果不是的話,說明這是一個阻塞呼叫,還需要去進行釋放
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
/**
* 在回撥執行器中執行回撥。如果回撥執行器為空,則直接在當前執行緒中執行
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
/**
* 執行回撥方法需要的執行緒池
*/
public abstract ExecutorService getCallbackExecutor();
/**
* 定期呼叫此方法來掃描和過期已棄用的請求。
*/
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
/**
* 該服務用於使用子類實現的自定義的事件處理器對指定的事件進行處理
*/
class NettyEventExecutor extends ServiceThread {
}
這段程式碼看起來比較長,但實際邏輯還是比較簡單的
- 響應 Request
- 流量控制觸發檢查;觸發則直接返回
- 獲取 request code 對應的處理器,執行請求;根據同步或非同步執行不同請求
- 返回響應;如果為 oneway request,不進行響應
- 響應 Response
- 取出對應的活動中的 request
- 根據非同步或同步來處理回撥
NettyRemotingClient
再來看它的客戶端實現類,這個類中我們主要看 Bootstrap 的建立
// 建立客戶端的 Bootstrap
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
// 禁用 Nagle 演算法
.option(ChannelOption.TCP_NODELAY, true)
// 關閉 keepalive,由我們自己管理連線
.option(ChannelOption.SO_KEEPALIVE, false)
// 設定的超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// 傳送視窗和接收視窗的大小
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// TLS層新增
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
// 使用自定義的 EventLoop
defaultEventExecutorGroup,
// 註冊編解碼註冊器
new NettyEncoder(),
new NettyDecoder(),
// 註冊 idle 檢查,下一個handle通過覆寫userEventTriggered監聽連線超時事件
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 管理連線,超時處理,維護channelTables與存活的連線
new NettyConnectManageHandler(),
// 實際上處理收到的請求
new NettyClientHandler());
}
});
編碼和解碼我們都已經知道是直接編解碼成位元組流,而 NettyClientHandler
的實現就是直接呼叫父類的請求處理,所以我們主要看下 NettyConnectManageHandler
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise);
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
closeChannel(ctx.channel());
super.disconnect(ctx, promise);
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
// 從 channelTables 移除
closeChannel(ctx.channel());
super.close(ctx, promise);
// 處理已經失敗的請求,呼叫回撥方法
NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 處理超過時未接收心跳的 channel
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}
NettyConnectManageHandler
繼承了 ChannelDuplexHandler
類,以此監聽 Channel。
其主要做的事是:
- 在連線 close 時移除在 channelTables 中移除並關閉連線
- 關閉超時的連線
最後,NettyClientHandler
將收到的請求直接傳入 NettyRemotingAbstract
的 processMessageReceived
方法來處理