RocketMQ原始碼解析-Producer訊息傳送
首先以預設的非同步訊息傳送模式作為例子。DefaultMQProducer中的send()方法會直接呼叫DefaultMQProducerImpl的send()方法,在DefaultMQProducerImpl會直接呼叫sendDefaultImpl()方法。
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { try { this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); } catch (MQBrokerException e) { throw new MQClientException("unknown exception", e); } } private SendResult sendDefaultImpl(// Message msg,// final CommunicationMode communicationMode,// final SendCallback sendCallback, final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000; final long beginTimestamp = System.currentTimeMillis(); long endTimestamp = beginTimestamp; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(); int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); endTimestamp = System.currentTimeMillis(); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); exception = e; endTimestamp = System.currentTimeMillis(); continue; } catch (MQClientException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); exception = e; endTimestamp = System.currentTimeMillis(); continue; } catch (MQBrokerException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); exception = e; endTimestamp = System.currentTimeMillis(); switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } // end of for if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", // times, // (System.currentTimeMillis() - beginTimestamp), // msg.getTopic(),// Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); throw new MQClientException(info, exception); } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null); }
由該方法的引數可得知Rocketmq的訊息傳送模式,開啟CommunicationMode可以看到具體的傳送模式
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
以非同步訊息傳送模式(ASYNC)作為例子,需要在具體的實現裡傳入相應的sendCallback處理訊息非同步收到訊息回覆結果的訊息處理。
首先通過呼叫makeSureStateOK()方法來確保該生產者正處於執行狀態,判斷的方式很簡單
private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "// + this.serviceState// + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
只要比較一下狀態量就行了。
接下來是對所需要的傳送的訊息進行驗證,具體的驗證方法在RocketMq裡的Validators實現
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
具體是對訊息具體資料的判空,以及在通過和在DefaultMQProducer裡的訊息大小配置屬性進行比較確保訊息的大小符合在配置中的設定。在其中也對訊息的topic進行了檢察,主要通過正則表示式確保topic的格式,以及topic的有效性。
在接下來的傳送步驟中,接下里通過呼叫tryTOFindTopicPublishInfo()方法來根據訊息的topic來獲取相關topicd的路由資訊。這個時候,整個傳送訊息的beginTimestamp已經被設定,也就是說整個傳送訊息的timeout已經開始。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
return topicPublishInfo;
}
else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
在這個方法中,首先會嘗試在DefaultMQProducerImpl中儲存的路由資訊map裡去尋找,如果找不到則會重新建立一個topicPublishInfo類,從名稱服務那裡去嘗試更新獲取新的路由資訊資料。這裡呼叫的方法updateTopicPublishInfo()方法與客戶端執行的定時更新路由任務相同,也就說在傳送topic找不到相應的路由資訊的時候會重複一次更新這個定時任務的操作。
在成功獲取了相應路由資訊的同時就可以正式開始訊息的傳送。在之前的嘗試獲取路由訊息的步驟已經算在了整個訊息傳送的timeout裡。
在整個訊息傳送的過程中,如果因為各種原因訊息傳送失敗,可以設定訊息重新發送的次數。也就是說一短訊息最大可以傳送的次數是1+最大可重發次數,建立一個該次數大小的定長陣列來儲存每次傳送的brokerName。這裡主要針對同步傳送模式。
首先通過之前獲得topic得到的路由資訊來以上一次傳送的BrokerName為依據得到當前次數傳送的訊息佇列。
由topicPublishInfo的selectOneMessageQuene()方法來實現。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
訊息佇列的選取在這裡根據訊息傳進來的時候的index達到平均輪詢各個訊息佇列的目的,也就是說完成了每個訊息佇列負載的平衡,與此同時,可以根據上一次傳送的broker名稱達到不在一條訊息佇列重複傳送的目的。
在成功獲取了需要傳送的訊息佇列之後,呼叫sendKernalImpl()傳送訊息。
private SendResult sendKernelImpl(final Message msg,//
final MessageQueue mq,//
final CommunicationMode communicationMode,//
final SendCallback sendCallback,//
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
if(this.defaultMQProducer.isSendMessageWithVIPChannel()) {
brokerAddr = MixAll.brokerVIPChannel(brokerAddr);
}
byte[] prevBody = msg.getBody();
try {
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.CompressedFlag;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TransactionPreparedType;
}
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
this.executeSendMessageHookBefore(context);
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(new Integer(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
}
SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
brokerAddr,// 1
mq.getBrokerName(),// 2
msg,// 3
requestHeader,// 4
timeout,// 5
communicationMode,// 6
sendCallback// 7
);
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
}
catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
}
catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
}
catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
}
finally {
msg.setBody(prevBody);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
首先,根據當前的BorkerName從本地的Broker地址快取中獲取相應的地址,如果找不到,跟之前的方式一樣,重新跟名稱服務更新新的路由資訊。
接下來根據之前的DefaultProducer配置類對具體的方式方式進行配置。
如果在一開始配置了高優先順序佇列,則在這裡就會選擇高優先順序佇列。
在這裡給出一個sysFlag標誌位。
之後進行壓縮處理,如果所要傳送訊息的body部分超過了配置類需要壓縮的大小。
private boolean tryToCompressMessage(final Message msg) {
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
msg.setBody(data);
return true;
}
}
catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
在壓縮中,具體採用了zip壓縮方式。
如果在此處的確採用了壓縮,則給標誌量低一位為1。
public final static int CompressedFlag = (0x1 << 0);
接下來,若果該訊息屬於事務訊息,也會給相應的標誌量賦值,這裡暫時不展開。
在接下里,如果該生產者配置了相關的註冊了chackForbiddenHook,則在這裡將會走一遍所有的註冊了的checkForbidden鉤子保證本來配置被禁發的訊息不會被髮送出去。
類似的,在接下里如果跟之前的鉤子一樣的方式配置註冊了sendMessageHook訊息傳送鉤子,則會在這裡遍歷呼叫所有鉤子的executesendMessageHookBefore()方法,相應的,在訊息傳送完畢之後也會執行executeSendMessageHookAfter()方法。
之後根據之前得到的一系列傳送訊息的配置,來構造傳送給Broker的請求頭資料。
在一切準備就緒之後,呼叫客戶端的API介面來實現訊息的物理髮送。
SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
brokerAddr,// 1
mq.getBrokerName(),// 2
msg,// 3
requestHeader,// 4
timeout,// 5
communicationMode,// 6
sendCallback// 7
);
如果採用了ASYNC的非同步傳送模式,則這個最後一個引數就是在訊息傳送之後用來處理訊息回覆的類。
public SendResult sendMessage(//
final String addr,// 1
final String brokerName,// 2
final Message msg,// 3
final SendMessageRequestHeader requestHeader,// 4
final long timeoutMillis,// 5
final CommunicationMode communicationMode,// 6
final SendCallback sendCallback// 7
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
}
else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
default:
assert false;
break;
}
return null;
}
以非同步方式為例
private void sendMessageAsync(//
final String addr,//
final String brokerName,//
final Message msg,//
final long timeoutMillis,//
final RemotingCommand request,//
final SendCallback sendCallback//
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
if (null == sendCallback)
return;
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
assert sendResult != null;
sendCallback.onSuccess(sendResult);
}
catch (Exception e) {
sendCallback.onException(e);
}
}
else {
if (!responseFuture.isSendRequestOK()) {
sendCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
}
else if (responseFuture.isTimeout()) {
sendCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause()));
}
else {
sendCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
}
}
}
});
}
在這裡會根據傳入的SendCallBack物件生成相應的responseFuture任務類交由netty客戶端來處理。
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
final ResponseFuture responseFuture =
new ResponseFuture(request.getOpaque(), timeoutMillis, invokeCallback, once);
this.responseTable.put(request.getOpaque(), responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
else {
responseFuture.setSendRequestOK(false);
}
responseFuture.putResponse(null);
responseTable.remove(request.getOpaque());
try {
responseFuture.executeInvokeCallback();
}
catch (Throwable e) {
plog.warn("excute callback in writeAndFlush addListener, and callback throw", e);
}
finally {
responseFuture.release();
}
plog.warn("send a request command to channel <{}> failed.",
RemotingHelper.parseChannelRemoteAddr(channel));
plog.warn(request.toString());
}
});
}
catch (Exception e) {
responseFuture.release();
plog.warn(
"send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel)
+ "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
}
else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
}
else {
String info =
String
.format(
"invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis,//
this.semaphoreAsync.getQueueLength(),//
this.semaphoreAsync.availablePermits()//
);
plog.warn(info);
plog.warn(request.toString());
throw new RemotingTimeoutException(info);
}
}
}
可以看到,生成的responseFuture被netty遠端客戶端管理在map裡,動態實現了在收到訊息回覆之後呼叫的operationCompleted()方法,將根據訊息結果的非同步返回呼叫相應的鄂onSuccess()或者onException()方法,來完成ASYNC非同步的目的。
在這裡,如果是非同步的將直接返回,由上面的方式完成之後訊息回覆的處理。到這裡RockerMQ非同步傳送的步驟正式宣告結束。
而ONEWAY單向訊息傳送模式在傳送完畢訊息後馬上會結束,並不會管訊息傳送的結果。
如果是SYNC同步訊息傳送模式,如果訊息傳送失敗,則會選擇另一個BrokerName來嘗試繼續傳送,直到retry次數用盡。當然顧名思義,在同步訊息模式的訊息傳送後,將會等待結果並呼叫客戶端API介面實現的processSendResponse()方法來處理結果。