RocketMQ原理學習---Producer訊息傳送
上一篇部落格RocketMQ原理學習-- Name Server中我們介紹了Name Server提供的相關功能,這篇部落格我們來介紹一下生產者訊息傳送相關的內容。
訊息傳送示例:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); try { for (int i = 0; i < 3; i++) { Message msg = new Message("TopicA-test",// topic "TagA",// tag (new Date() + "Hello RocketMQ ,QuickStart 11" + i) .getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
在DefaultMQProducer中呼叫send方法傳送訊息
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
最終是呼叫DefaultMQProducerImpl的send方法來發送訊息
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.send(msg, (long)this.defaultMQProducer.getSendMsgTimeout()); }
最終會呼叫sendDefaultImpl方法來發送訊息:
(1)在傳送訊息時會根據訊息的Topic名稱從Name Server中根據Topic名稱來獲取Broker相關的地址資訊TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
(2)由於一個Broker中對於每個Topic會有多個MessageQueue,生產者預設選擇MessageQueue策略為輪詢MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
(3)選擇了MessageQueue後,會根據MessageQueue中Broker資訊呼叫sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);傳送訊息。
private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
long invokeID = this.random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
while(true) {
String info;
if (times < timesTotal) {
info = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mqSelected.getBrokerName();
long endTimestamp;
try {
beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch(communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
return sendResult;
}
}
} catch (RemotingException var24) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var24);
this.log.warn(msg.toString());
exception = var24;
} catch (MQClientException var25) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var25);
this.log.warn(msg.toString());
exception = var25;
} catch (MQBrokerException var26) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
this.log.warn(msg.toString());
exception = var26;
switch(var26.getResponseCode()) {
case 1:
case 14:
case 16:
case 17:
case 204:
case 205:
break;
default:
if (sendResult != null) {
return sendResult;
}
throw var26;
}
} catch (InterruptedException var27) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
this.log.warn(msg.toString());
this.log.warn("sendKernelImpl exception", var27);
this.log.warn(msg.toString());
throw var27;
}
++times;
continue;
}
}
if (sendResult != null) {
return sendResult;
}
info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(10001);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(10002);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(10003);
}
throw mqClientException;
}
} else {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null != nsList && !nsList.isEmpty()) {
throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
} else {
throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
}
}
}
在sendKernelImpl中會獲取Broker的地址資訊,然後組裝訊息資訊,呼叫MQClientAPIImpl的sendMessage方法將訊息傳送到Broker中。
private SendResult sendKernelImpl(Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//獲取Broker的地址資訊
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
SendResult var28;
try {
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= 1;
}
String tranMsg = msg.getProperty("TRAN_MSG");
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= 4;
}
if (this.hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty("TRAN_MSG");
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty("DELAY") != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
//組裝訊息請求頭資訊
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith("%RETRY%")) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, "RECONSUME_TIME");
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, "MAX_RECONSUME_TIMES");
}
}
SendResult sendResult = null;
switch(communicationMode) {
//非同步訊息
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
break;
case ONEWAY:
//同步訊息
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this);
break;
default:
assert false;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
var28 = sendResult;
} catch (RemotingException var21) {
if (this.hasSendMessageHook()) {
context.setException(var21);
this.executeSendMessageHookAfter(context);
}
throw var21;
} catch (MQBrokerException var22) {
if (this.hasSendMessageHook()) {
context.setException(var22);
this.executeSendMessageHookAfter(context);
}
throw var22;
} catch (InterruptedException var23) {
if (this.hasSendMessageHook()) {
context.setException(var23);
this.executeSendMessageHookAfter(context);
}
throw var23;
} finally {
msg.setBody(prevBody);
}
return var28;
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", (Throwable)null);
}
}
在MQClientAPIImpl中會呼叫底層通訊模組NettyRemotingClient的invokeSync方法將訊息資訊傳送到Broker中。
public SendResult sendMessage(String addr, String brokerName, Message msg, SendMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, MQClientInstance instance, int retryTimesWhenSendFailed, SendMessageContext context, DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
//建立訊息
if (!sendSmartMsg && !(msg instanceof MessageBatch)) {
request = RemotingCommand.createRequestCommand(10, requestHeader);
} else {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? 320 : 310, requestHeaderV2);
}
//新增訊息請求體
request.setBody(msg.getBody());
switch(communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
default:
assert false;
return null;
}
}
在NettyRemotingClient中呼叫invokeSync方法根據Broker地址資訊建立Channel,然後根據request請求資訊發起訊息,並獲取返回值RemotingCommand。
public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException var7) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw var7;
} catch (RemotingTimeoutException var8) {
if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw var8;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
總結:
(1)生產者根據Topic名稱從Name Server中獲取相關Broker資訊,根據Broker的地址資訊將訊息傳送到對應Broker地址中
(2)對於同一個Topic,一個Broker對應多個MessageQueue(Topic可以在多個Broker中),生產者預設通過取模輪詢方式將訊息傳送到對應的MessageQueue中。
(3)生產者提供了介面send(Message msg, MessageQueueSelector selector, Object arg),我們可以通過實現MessageQueueSelector介面可以實現選取MessageQueue的方法,例如實現有序訊息就可以將需要有序的訊息傳送到同一個MessageQueue即可。