1. 程式人生 > 程式設計 >跟我學RocketMQ之訊息傳送原始碼解析

跟我學RocketMQ之訊息傳送原始碼解析

本文我將帶領讀者朋友對RocketMQ生產者如何傳送訊息這一流程進行原始碼層面的解析。內容偏幹,請自備白開水。

生產者初始化

進行訊息傳送的前提是先對生產者進行初始化,一段較為常規的生產者初始化示例程式碼如下

@Value("${rocketmq.nameServer}")
String nameSrvAddr;

@PostConstruct
public void init() {

    DefaultMQProducer defaultMQProducer =
            new DefaultMQProducer("PRODUCER_GROUP",true);
    defaultMQProducer.setNamesrvAddr(nameSrvAddr);
    // 傳送失敗重試次數
    defaultMQProducer.setRetryTimesWhenSendFailed(3);
    try {
        defaultMQProducer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("Producer載入異常!",e);
    }
}
複製程式碼

我們對初始化流程稍作分析。

首先初始化一個DefaultMQProducer例項,呼叫構造方法

public DefaultMQProducer(final String producerGroup,boolean enableMsgTrace) {
    this(null,producerGroup,null,enableMsgTrace,null);
}
複製程式碼

第二個引數為是否開啟訊息軌跡支援,關於訊息軌跡的原始碼解析可以移步 《跟我學RocketMQ之訊息軌跡實戰與原始碼分析》

通過setNamesrvAddr(String namesrvAddr)設定nameserver地址;通過setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)設定重發次數,預設為2。

[DefaultMQProducer.java]
private int retryTimesWhenSendFailed = 2;
複製程式碼

接著呼叫start()方法啟動defaultMQProducer

[DefaultMQProducer.java]
@Override
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 啟動producer例項
    this.defaultMQProducerImpl.start();
    ...省略traceDispatcher相關邏輯...
}
複製程式碼

可以看到是呼叫的defaultMQProducerImpl的start()

[DefaultMQProducerImpl.java]

public void start() throws MQClientException {
    this.start(true);
}
複製程式碼

實際呼叫了start的過載方法,startFactory==true

// MQClientInstance引用
private MQClientInstance mQClientFactory;

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 如果當前服務狀態為CREATE_JUST【剛建立】
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();
複製程式碼

注意這句程式碼

            // 判斷當前生產者組是否符合要求
            // 改變生產者的例項id為程式id
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
複製程式碼

這裡檢查生產者組是否符合要求,符合則改變生產者的instanceName為程式id,具體邏輯為

private String instanceName = System.getProperty("rocketmq.client.name","DEFAULT");

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}
複製程式碼

例項名為配置檔案配置得到的,預設為DEFAULT,我們接著看start的過載方法

            // 初始化一個MQ客戶端工廠,同一個clientId只有一個MQClientInstance
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);
複製程式碼

這裡初始化了MQ客戶端工廠,對於同一個clientId只有一個MQClientInstance。看一下getAndCreateMQClientInstance方法。

[MQClientManager.java]
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig,RPCHook rpcHook) {
    // 構建MQClientId
    String clientId = clientConfig.buildMQClientId();
    
    // 從clientId與MQClientInstance對映表factoryTable中獲取當前clientId對應的MQClientInstance
    MQClientInstance instance = this.factoryTable.get(clientId);

    // 如果MQClientInstance不存在則建立一個新的並放入對映表factoryTable中
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(),clientId,rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId,instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]",clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]",clientId);
        }
    }
    return instance;
}
複製程式碼

我們接著看下clientId是如何生成的

/**
 * 構建MQ客戶端id
 * clientId=客戶端ip+@+例項名+unitName(可選)
 * @return
 */
public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());

    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }
    return sb.toString();
}
複製程式碼

可以看到,clientId的構造規則為:

clientId=客戶端ip+@+例項名+unitName(可選),對於同一個JVM中的不同消費者和不同生產者在啟動時候獲取到的MQClientInstance是同一個。MQClientInstance是封裝了網路呼叫相關的邏輯。

我們接著回到start方法中

            // 註冊生產者,將當前生產者加入到MQClientInstance中
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(),this);
            if (!registerOK) {
                // 註冊失敗,狀態==僅建立
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
            }

            // 註冊成功則將當前生產者組對應的topic與釋出關係放入topicPublishInfoTable登入檔
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(),new TopicPublishInfo());

            // 啟動MQClientFactory,如果已經啟動則不會再啟動一次
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}",this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK,maybe started once,"
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);
        default:
            break;
    }
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
複製程式碼

這裡向MQClientInstance進行註冊,將當前的生產者加入到MQClientInstance管理中。

通過mQClientFactory.start();啟動MQClientInstance,如果已經啟動則不會重複啟動,具體的程式碼邏輯如下:

[MQClientInstance.java]
public void start() throws MQClientException {

    // 同步當前例項
    synchronized (this) {
        switch (this.serviceState) {
            // MQClientInstance狀態為[剛建立],進行啟動操作
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service  啟動訊息拉取執行緒
                this.pullMessageService.start();
                // Start rebalance service 啟動訊息重負載執行緒
                this.rebalanceService.start();
                // Start push service 啟動生產者
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK",this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            
            // 如果當前服務的狀態為RUNNING執行中則不重複啟動
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before,and failed.",null);
            default:
                break;
        }
    }
}
複製程式碼

由於生產者和訊息者例項均使用同一個MQClientInstance,因此會在MQClientInstance中同時對生產者執行緒、消費拉取執行緒、rebalance執行緒進行啟動操作。

到此,訊息傳送的必要條件:生產者啟動過程就結束了,我們接著研究一下訊息傳送的流程。

核心傳送流程之DefaultMQProducerImpl.sendDefaultImpl方法

訊息傳送的關鍵API為 send 方法,常見的一個API宣告為

[DefaultMQProducer.java]
public SendResult send(Message msg,long timeout) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    return this.defaultMQProducerImpl.send(msg,timeout);
}
複製程式碼

它呼叫的是 DefaultMQProducerImpl 中的send

[DefaultMQProducerImpl.java]
public SendResult send(Message msg,InterruptedException {
    return this.sendDefaultImpl(msg,CommunicationMode.SYNC,timeout);
}
複製程式碼

呼叫了 sendDefaultImpl 方法,方法宣告及引數解釋如下

[DefaultMQProducerImpl.java]
private SendResult sendDefaultImpl(
    Message msg,// 訊息傳送實體
    final CommunicationMode communicationMode,// 傳送類別,列舉型別
    final SendCallback sendCallback,// 如果是非同步傳送方式,則需要實現SendCallback回撥
    final long timeout                          // 超時時間
) throws MQClientException,InterruptedException {
複製程式碼

CommunicationMode 表示傳送類別

public enum CommunicationMode {
    SYNC,// 同步傳送
    ASYNC,// 非同步傳送
    ONEWAY,// 直接傳送,不關心傳送結果
}
複製程式碼

我們詳細分析一下sendDefaultImpl方法邏輯:

    [DefaultMQProducerImpl.java]
    long beginStartTime = System.currentTimeMillis();
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
複製程式碼

從當前的MQClientInstance中獲取broker地址,如果broker地址為空,則向NameServer查詢該Topic路由資訊,我們看一下findBrokerAddressInPublish方法

    [DefaultMQProducerImpl.java]
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // 從快取的topic路由表中獲取topic路由
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

        // 不存在則向NameServer發起查詢
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        // 路由表中存在路由資訊
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            // 返回路由資訊
            return topicPublishInfo;
        } else {
            // 從NameServer中獲取最新的路由資訊,更新路由表
            // 返回當前路由資訊
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
複製程式碼

獲取到路由表資訊後,開始進行傳送前的校驗等邏輯,預先定義一些變數供後續使用

if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
複製程式碼

獲取傳送總次數,傳送次數timesTotal是根據傳送類別決定的。

如果是同步傳送[CommunicationMode.SYNC],則傳送總次數== 1+重試次數(retryTimesWhenSendFailed);

如果是非同步傳送[CommunicationMode.ASYNC],則傳送總次數== 1;

        // 獲取傳送總次數
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
複製程式碼

選擇根據topic路由表及broker名稱,獲取一個messageQueue,本次傳送的佇列就是選取的佇列,關於選取佇列的方法selectOneMessageQueue,我們馬上就展開看下細節

            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo,lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    // 計算一下傳送消耗的時間
                    long costTime = beginTimestampPrev - beginTimestampFirst;
複製程式碼

我們看一下selectOneMessageQueue方法是如何進行佇列的選擇的:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerName) {
    
    // 如果啟用了broker故障延遲機制
    if (this.sendLatencyFaultEnable) {
        try {
            // 本次需要傳送的佇列的index就是SendWhichQueue自增得到的
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // index與當前路由表中的對列總個數取模
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                // 獲取到當前對應的待傳送佇列
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            // 至少選擇一個broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            // 獲取broker中的可寫佇列數
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            // 如果可寫佇列數>0,則選取一個佇列
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                // 可寫佇列數 <= 0 移除該broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue",e);
        }

        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
複製程式碼

這段程式碼的核心就是進行佇列的選取,選取的過程中伴隨著故障檢測,對於故障broker能夠做到儘可能規避。

我們回到訊息傳送邏輯sendDefaultImpl中,在每一次傳送過程中,計算本次傳送的實際消耗時間,並與傳送端設定的傳送超時時間做比較。

如果設定的超時時間timeout小於實際消耗的時間,說明傳送超時,程式碼如下

                    if (timeout < costTime) {
                        callTimeout = true;
                        // 傳送超時結束本次迴圈
                        break;
                    }
複製程式碼

進行真正的訊息傳送流程,呼叫sendKernelImpl方法,程式碼如下。關於sendKernelImpl邏輯在後文會展開論述。

                    sendResult = this.sendKernelImpl(msg,mq,communicationMode,sendCallback,topicPublishInfo,timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(),endTimestamp - beginTimestampPrev,false);
複製程式碼

根據傳送型別進行邏輯執行

                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
複製程式碼

這段程式碼較好理解,如果是非同步方式,直接返回sendResult為null,真實的傳送結果是在回撥SendCallback中獲取的;如果是ONEWAY方式,則根本不關心傳送結果;

如果是同步方式,判斷髮送結果是否為 SendStatus.SEND_OK,執行邏輯isRetryAnotherBrokerWhenNotStoreOK,這裡是訊息傳送失敗的重試邏輯:

如果訊息未持久化重試下一個broker成功,則跳出本次迴圈,繼續下次重試。

此處省略異常處理邏輯,感興趣的可以自行檢視原始碼。

        if (sendResult != null) {
            return sendResult;
        }
複製程式碼

如果獲取到傳送結果sendResult不為空,則返回該傳送結果供業務側進行處理。

核心傳送流程之DefaultMQProducerImpl.sendKernelImpl方法

我們重點來研究一下sendKernelImpl方法,它是訊息傳送的出口,也是真正發起訊息傳送呼叫的邏輯。

方法宣告如下:

private SendResult sendKernelImpl(
                    // 待傳送的訊息
                    final Message msg,// 訊息待傳送的佇列,該佇列是通過selectOneMessageQueue選擇的
                    final MessageQueue mq,// 訊息傳送模式
                    final CommunicationMode communicationMode,// 如果是非同步傳送,則需要實現SendCallback
                    final SendCallback sendCallback,// topic對應的路由資訊表
                    final TopicPublishInfo topicPublishInfo,// 傳送超時時間,由客戶端指定
                    final long timeout) throws MQClientException,InterruptedException {
複製程式碼

獲取傳送真實開始時間以及brokerAddr,這裡的邏輯與sendDefaultImpl的完全一致不再贅述,之所以再呼叫一次的原因,應當是為了準確性,時間就不說了;可用的brokerAddr列表是的動態拉取的,應當獲取當前最新的brokerAddr。

    long beginStartTime = System.currentTimeMillis();
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
複製程式碼

根據broker地址計算得到VIP通道地址,計算方法為ip+(預設埠號-2)

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(),brokerAddr);     
        // 獲取訊息體byte陣列
        byte[] prevBody = msg.getBody();
複製程式碼

接著對訊息進行前置處理,為訊息分配全域性唯一Id,對於批量訊息,它的全域性唯一id是單獨生成的,後面細說。

        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
複製程式碼

判斷是否為事務訊息

    // 獲取訊息屬性,key=PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
    // 判斷是否為事務訊息
    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

    // 如果是事務訊息,通過sysFlag與TRANSACTION_PREPARED_TYPE按位或,計算最新的sysFlag
    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }
複製程式碼

如果傳送時註冊了傳送鉤子方法,則先執行該傳送鉤子邏輯進行前置增強,這種方式類似於切面的邏輯。

            if (this.hasSendMessageHook()) {

                // 設定訊息傳送上下文
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());

                // 如果是事務訊息,則上下文中標記訊息型別為事務半訊息Trans_Msg_Half
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }
複製程式碼

關於事務訊息的傳送後續會單獨發文進行分析,此處不展開

                // 如果是延時訊息,則標記訊息型別為延時訊息Delay_Msg
                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }

                // 執行傳送前置鉤子方法
                this.executeSendMessageHookBefore(context);
            }
複製程式碼

執行完傳送前置的鉤子方法之後,開始正式執行傳送邏輯,首先對訊息傳送請求頭進行例項化。

            // 宣告並初始化訊息傳送請求頭
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

            // 設定請求頭引數:傳送者組
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            // 設定請求頭引數:topic
            requestHeader.setTopic(msg.getTopic());
            // 設定預設topic,其實就是MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC=TBW102,如果開啟了自動建立topic,則會建立該topic
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            // 預設topic對應的訊息佇列數量
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            // 當前要傳送的訊息對應的佇列id
            requestHeader.setQueueId(mq.getQueueId());
            // 系統標識,前面邏輯計算得到
            requestHeader.setSysFlag(sysFlag);
            // 訊息誕生時間,系統當前時間
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            // 訊息flag
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 由於是傳送訊息,所以設定為0
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            // 是否為批量訊息
            requestHeader.setBatch(msg instanceof MessageBatch);
複製程式碼

如果當前訊息的topic以MixAll.RETRY_GROUP_TOPIC_PREFIX開頭,

RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";

表明當前topic實際上是topic對應的重試topic,則執行訊息重試傳送相關的邏輯

            // 如果當前訊息topic為重試topic
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                // 獲取重試次數
                // 重試次數不為null則清除重試次數
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg,MessageConst.PROPERTY_RECONSUME_TIME);
                }

                // 獲取最大重試次數
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg,MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
複製程式碼

根據真實的傳送型別選擇對應的訊息傳送方式:

首先來看一下傳送方式為:ASYNC(非同步傳送方式)的傳送邏輯

             switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    // 如果訊息body是壓縮的,則使用prevBody,prevBody就是真實的msgBody對應的byte[]
                    if (msgBodyCompressed) {
                        //If msg body was compressed,msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        // 將壓縮的訊息體恢復為原訊息體
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),this.defaultMQProducer.getNamespace()));
                    }


                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
複製程式碼

呼叫MQClientInstance的getMQClientAPIImpl.sendMessage方法進行網路通訊,並獲取傳送結果

                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);
                    break;
複製程式碼

MQClientAPIImpl.sendMessage

我們直接看一下MQClientAPIImpl.sendMessage邏輯是如何處理非同步訊息傳送的

public SendResult sendMessage(
    final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer
) throws RemotingException,InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;

    // 如果訊息是sendSmartMsg(org.apache.rocketmq.client.sendSmartMsg==true)
    // 或者是批量訊息
    if (sendSmartMsg || msg instanceof MessageBatch) {

        // 更換髮送請求頭
        SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
        request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2,requestHeaderV2);
    } else {

        // 如果訊息是非批量傳送
        // 設定訊息傳送命令為RequestCode.SEND_MESSAGE
        request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE,requestHeader);
    }

    // 設定傳送請求body為訊息的msgBody
    request.setBody(msg.getBody());

    switch (communicationMode) {
        // 如果是ONEWAY方式,發出去不關心結果
        case ONEWAY:
            this.remotingClient.invokeOneway(addr,request,timeoutMillis);
            return null;

        // 如果是非同步方式,判斷是否傳送超時  
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // 呼叫非同步訊息傳送方法
            this.sendMessageAsync(addr,brokerName,msg,timeoutMillis - costTimeAsync,instance,retryTimesWhenSendFailed,times,producer);
            return null;

        // 如果是同步傳送,呼叫同步方法方法    
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            return this.sendMessageSync(addr,timeoutMillis - costTimeSync,request);
        default:
            assert false;
            break;
    }
    return null;
}
複製程式碼

好像還沒結束,那麼我們就分別看一下非同步方式和同步方式對應的傳送方法。

非同步傳送方法 sendMessageAsync

    ...方法宣告省略,實在是太長了...
    // 非同步方式呼叫傳送邏輯
    this.remotingClient.invokeAsync(addr,timeoutMillis,// 傳送回撥的真實邏輯
    new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();

            // 如果業務傳送方沒有實現sendCallback,但是有介面呼叫返回值response
            if (null == sendCallback && response != null) {

                try {

                    // 傳送返回值sendResult為processSendResponse處理得到的
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName,response);

                    // 重新整理訊息傳送上下文,執行傳送後鉤子方法
                    if (context != null && sendResult != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }
                } catch (Throwable e) {
                }

                // 更新故障broker
                producer.updateFaultItem(brokerName,System.currentTimeMillis() - responseFuture.getBeginTimestamp(),false);
                return;
            }

            // 對於實現了sendCallback的傳送端
            if (response != null) {
                try {

                    // 獲取sendResult
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName,response);
                    assert sendResult != null;

                    // 執行傳送後鉤子方法
                    if (context != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }

                    // 回撥傳送成功回撥方法onSuccess
                    try {
                        sendCallback.onSuccess(sendResult);
                    } catch (Throwable e) {
                    }

                    producer.updateFaultItem(brokerName,false);
                } catch (Exception e) {
                    producer.updateFaultItem(brokerName,true);

                    // 對於處理異常的情況,傳入sendCallback,回撥其傳送
                    // 失敗回撥方法onException(e)
                    onExceptionImpl(brokerName,0L,e,false,producer);
                }
                ...省略其他異常流程,大同小異...
        }
    });
}
複製程式碼

為了方便大家理解,這裡對invokeAsync非同步處理邏輯做一個小結:

  1. 首先判斷介面引數中是否存在sendCallback;
    1. 如果有且非空,則在取得傳送結果sendResult之後回撥sendCallback的onSuccess方法,以便傳送方對傳送結果做進一步的處理
    2. 如果sendCallback不存在,則直接解析傳送結果,按照同步傳送方式進行處理
    3. 最後,如果存在傳送上下文context,則執行傳送後鉤子方法
  2. 對於存在sendCallback,但傳送異常的情況,回撥sendCallback的onException方法進行異常處理。
  3. 對於異常的broker節點進行更新操作

同步傳送方法 sendMessageSync

private SendResult sendMessageSync(
    final String addr,final RemotingCommand request
) throws RemotingException,InterruptedException {
    // 0.執行同步傳送邏輯
    RemotingCommand response = this.remotingClient.invokeSync(addr,timeoutMillis);
    // 1.校驗返回參 斷言
    assert response != null;
    // 2.處理髮送結果
    return this.processSendResponse(brokerName,response);
}
複製程式碼

關於處理髮送結果方法processSendResponse的分析我們放到批量訊息傳送的部分講解

回到DefaultMQProducerImpl.sendKernelImpl

講完了非同步傳送方式及下方的呼叫邏輯,我們回到sendKernelImpl中,繼續看其他的傳送方式。

                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,timeout - costTimeSync,this);
                    break;
                default:
                    assert false;
                    break;
            }
複製程式碼

對於ONEWAY、同步方式,處理邏輯相同,都是直接呼叫 MQClientAPIImpl.sendMessage 這個方法的邏輯,該方法我們已經在上文中解釋過,就不再贅述了,讀者可以通過 MQClientAPIImpl.sendMessage 三級標題自行去檢視。

對於同步方式執行sendMessageSync方法,該方法在上文中已經講解過;對於oneway方式執行invokeOneway方法。

invokeOneWay的真實邏輯在NettyRemotingClient.java中實現,NettyRemotingClient封裝了底層的網路互動,關於它的其他內容後續會在網路通訊部分的解析文章中展開。

@Override
public void invokeOneway(String addr,RemotingCommand request,long timeoutMillis) throws InterruptedException,RemotingConnectException,RemotingTooMuchRequestException,RemotingTimeoutException,RemotingSendRequestException {
    // 根據broker地址建立NIO的通訊channel    
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 執行傳送前置鉤子方法
            doBeforeRpcHooks(addr,request);
            // 執行真實的網路呼叫,不關心傳送結果
            this.invokeOnewayImpl(channel,timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeOneway: send request exception,so close the channel[{}]",addr);
            this.closeChannel(addr,channel);
            throw e;
        }
    } else {
        this.closeChannel(addr,channel);
        throw new RemotingConnectException(addr);
    }
}
複製程式碼

OneWay傳送方式執行完網路通訊之後不關注返回結果,因此適用於對返回值不敏感的流程中,比如日誌上報、埋點上報等業務中。

我們繼續回到DefaultMQProducerImpl.sendKernelImpl方法中.

            // 如果註冊了傳送後的鉤子函式
            // 執行該鉤子函式
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
複製程式碼

這段程式碼發生在傳送邏輯之後,不論是何種傳送型別,如果包含了傳送訊息的鉤子方法,則將傳送結果sendResult設定到傳送訊息上下文context中(對於sendOneWay方式,返回的sendResult為null)。然後執行傳送訊息後的鉤子方法sendMessageAfter,邏輯如下:

public void executeSendMessageHookAfter(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageAfter(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookAfter",e);
            }
        }
    }
}
複製程式碼

鉤子方法的註冊是通過 DefaultMQProducerImpl.registerSendMessageHook 方法註冊的,可以註冊多個,為一個list。因此上述executeSendMessageHookAfter方法中為對該list的遍歷,每輪遍歷中執行該SendMessageHook的sendMessageAfter方法。

小結

本文是原始碼解析的第二篇文章,也是屬於偏硬核的一類文章,如果你能堅持讀到這裡,請給自己一個鼓勵,你已經強過很多人了。

筆者對RocketMQ的研究程度尚淺,因此難免出現紕漏,筆者會再接再勵。關於批量訊息傳送、事務訊息傳送等邏輯的分析,在接下來的文章將會陸續進行展開,我們不見不散。


版權宣告:
原創不易,洗文可恥。除非註明,本博文章均為原創,轉載請以連結形式標明本文地址。