1. 程式人生 > >RocketMQ原理學習---Producer訊息傳送

RocketMQ原理學習---Producer訊息傳送

        上一篇部落格RocketMQ原理學習-- Name Server中我們介紹了Name Server提供的相關功能,這篇部落格我們來介紹一下生產者訊息傳送相關的內容。

訊息傳送示例:

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
       producer.shutdown();
    }

}

在DefaultMQProducer中呼叫send方法傳送訊息

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }

最終是呼叫DefaultMQProducerImpl的send方法來發送訊息

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.send(msg, (long)this.defaultMQProducer.getSendMsgTimeout());
    }

最終會呼叫sendDefaultImpl方法來發送訊息:

(1)在傳送訊息時會根據訊息的Topic名稱從Name Server中根據Topic名稱來獲取Broker相關的地址資訊TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

(2)由於一個Broker中對於每個Topic會有多個MessageQueue,生產者預設選擇MessageQueue策略為輪詢MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);

(3)選擇了MessageQueue後,會根據MessageQueue中Broker資訊呼叫sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);傳送訊息。

 private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        long invokeID = this.random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];

            while(true) {
                String info;
                if (times < timesTotal) {
                    info = null == mq ? null : mq.getBrokerName();
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mqSelected.getBrokerName();

                        long endTimestamp;
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch(communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    return sendResult;
                                }
                            }
                        } catch (RemotingException var24) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var24);
                            this.log.warn(msg.toString());
                            exception = var24;
                        } catch (MQClientException var25) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var25);
                            this.log.warn(msg.toString());
                            exception = var25;
                        } catch (MQBrokerException var26) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
                            this.log.warn(msg.toString());
                            exception = var26;
                            switch(var26.getResponseCode()) {
                            case 1:
                            case 14:
                            case 16:
                            case 17:
                            case 204:
                            case 205:
                                break;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw var26;
                            }
                        } catch (InterruptedException var27) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
                            this.log.warn(msg.toString());
                            this.log.warn("sendKernelImpl exception", var27);
                            this.log.warn(msg.toString());
                            throw var27;
                        }

                        ++times;
                        continue;
                    }
                }

                if (sendResult != null) {
                    return sendResult;
                }

                info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
                info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                if (exception instanceof MQBrokerException) {
                    mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
                } else if (exception instanceof RemotingConnectException) {
                    mqClientException.setResponseCode(10001);
                } else if (exception instanceof RemotingTimeoutException) {
                    mqClientException.setResponseCode(10002);
                } else if (exception instanceof MQClientException) {
                    mqClientException.setResponseCode(10003);
                }

                throw mqClientException;
            }
        } else {
            List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
            if (null != nsList && !nsList.isEmpty()) {
                throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
            } else {
                throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
            }
        }
    }

在sendKernelImpl中會獲取Broker的地址資訊,然後組裝訊息資訊,呼叫MQClientAPIImpl的sendMessage方法將訊息傳送到Broker中。

private SendResult sendKernelImpl(Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //獲取Broker的地址資訊
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            this.tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
            byte[] prevBody = msg.getBody();

            SendResult var28;
            try {
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= 1;
                }

                String tranMsg = msg.getProperty("TRAN_MSG");
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= 4;
                }

                if (this.hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty("TRAN_MSG");
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty("DELAY") != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }

                    this.executeSendMessageHookBefore(context);
                }
                //組裝訊息請求頭資訊
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith("%RETRY%")) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, "RECONSUME_TIME");
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, "MAX_RECONSUME_TIMES");
                    }
                }

                SendResult sendResult = null;
                switch(communicationMode) {
                //非同步訊息
                case ASYNC:
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
                    break;
                case ONEWAY:
                //同步訊息
                case SYNC:
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this);
                    break;
                default:
                    assert false;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                var28 = sendResult;
            } catch (RemotingException var21) {
                if (this.hasSendMessageHook()) {
                    context.setException(var21);
                    this.executeSendMessageHookAfter(context);
                }

                throw var21;
            } catch (MQBrokerException var22) {
                if (this.hasSendMessageHook()) {
                    context.setException(var22);
                    this.executeSendMessageHookAfter(context);
                }

                throw var22;
            } catch (InterruptedException var23) {
                if (this.hasSendMessageHook()) {
                    context.setException(var23);
                    this.executeSendMessageHookAfter(context);
                }

                throw var23;
            } finally {
                msg.setBody(prevBody);
            }

            return var28;
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", (Throwable)null);
        }
    }

在MQClientAPIImpl中會呼叫底層通訊模組NettyRemotingClient的invokeSync方法將訊息資訊傳送到Broker中。

public SendResult sendMessage(String addr, String brokerName, Message msg, SendMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, MQClientInstance instance, int retryTimesWhenSendFailed, SendMessageContext context, DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = null;
		//建立訊息
        if (!sendSmartMsg && !(msg instanceof MessageBatch)) {
            request = RemotingCommand.createRequestCommand(10, requestHeader);
        } else {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? 320 : 310, requestHeaderV2);
        }
		//新增訊息請求體
        request.setBody(msg.getBody());
        switch(communicationMode) {
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            AtomicInteger times = new AtomicInteger();
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
        default:
            assert false;

            return null;
        }
}

在NettyRemotingClient中呼叫invokeSync方法根據Broker地址資訊建立Channel,然後根據request請求資訊發起訊息,並獲取返回值RemotingCommand。

public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }

                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }

                return response;
            } catch (RemotingSendRequestException var7) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw var7;
            } catch (RemotingTimeoutException var8) {
                if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }

                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw var8;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

總結:

(1)生產者根據Topic名稱從Name Server中獲取相關Broker資訊,根據Broker的地址資訊將訊息傳送到對應Broker地址中

(2)對於同一個Topic,一個Broker對應多個MessageQueue(Topic可以在多個Broker中),生產者預設通過取模輪詢方式將訊息傳送到對應的MessageQueue中。

(3)生產者提供了介面send(Message msg, MessageQueueSelector selector, Object arg),我們可以通過實現MessageQueueSelector介面可以實現選取MessageQueue的方法,例如實現有序訊息就可以將需要有序的訊息傳送到同一個MessageQueue即可。