1. 程式人生 > 實用技巧 >RocketMQ(四)producer生產訊息原始碼剖析

RocketMQ(四)producer生產訊息原始碼剖析

一、Demo

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * Description:
 *
 * @author TongWei.Chen 2020-06-21 11:32:58
 */
public class ProducerDemo {
    public static void main(String[] args) throws
Exception { DefaultMQProducer producer = new DefaultMQProducer("my-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); producer.start(); Message msg = new Message("myTopic001", "hello world".getBytes()); SendResult result = producer.send(msg); System.out.println(
"傳送訊息成功!result is : " + result); } }

二、原始碼剖析

1、準備工作

1.1、new DefaultMQProducer()

public DefaultMQProducer(final String producerGroup) {
    this(null, producerGroup, null);
}

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
    // null
    this.namespace = namespace;
    
// my-producer this.producerGroup = producerGroup; // new DefaultMQProducerImpl(this, null); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); }
  • 給producerGroup賦值
  • new DefaultMQProducerImpl()

1.2、setNamesrvAddr()

/**
 * {@link org.apache.rocketmq.client.ClientConfig}
 */
public void setNamesrvAddr(String namesrvAddr) {
    this.namesrvAddr = namesrvAddr;
}
  • 給namesrv賦值

2、啟動

2.1、start()

@Override
public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}

2.1.1、defaultMQProducerImpl.start()

private ServiceState serviceState = ServiceState.CREATE_JUST;

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 預設為CREATE_JUST狀態
        case CREATE_JUST:
            //  先預設成啟動失敗,等最後完全啟動成功的時候再置為ServiceState.RUNNING
            this.serviceState = ServiceState.START_FAILED;
   /**
    * 檢查配置,比如group有沒有寫,是不是預設的那個名字,長度是不是超出限制了,等等一系列驗證。
    */
            this.checkConfig();
   /*
             * 單例模式,獲取MQClientInstance物件,客戶端例項。也就是Producer所部署的機器例項物件,負責操作的主要物件。
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
   /**
    * 註冊producer,其實就是往producerTable map裡仍key-value
    * private final ConcurrentMap<String, MQProducerInner> producerTable = 
    * new ConcurrentHashMap<String, MQProducerInner>();
    * producerTable.putIfAbsent("my-producer", DefaultMQProducerImpl);
    */
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }
   // 將topic資訊存到topicPublishInfoTable這個map裡
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
   
            if (startFactory) {
                // 真正的啟動核心類
                mQClientFactory.start();
            }
   // 都啟動完成,沒報錯的話,就將狀態改為執行中
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                // 每隔1s掃描過期的請求
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

2.1.2、mQClientFactory.start()

public void start() throws MQClientException {
    synchronized (this) {
        // 預設為CREATE_JUST狀態
        switch (this.serviceState) {
            case CREATE_JUST:
                // 先預設成啟動失敗,等最後完全啟動成功的時候再置為ServiceState.RUNNING
                this.serviceState = ServiceState.START_FAILED;
                
                // 啟動請求響應通道,核心netty
                this.mQClientAPIImpl.start();
                /**
                 * 啟動各種定時任務
                 * 1.每隔2分鐘去檢測namesrv的變化
                 * 2.每隔30s從nameserver獲取topic的路由資訊有沒有發生變化,或者說有沒有新的topic路由資訊
                 * 3.每隔30s清除下線的broker
                 * 4.每隔5s持久化所有的消費進度
                 * 5.每隔1分鐘檢測執行緒池大小是否需要調整
                 */
                this.startScheduledTask();
                // 啟動拉取訊息服務
                this.pullMessageService.start();
                // 啟動Rebalance負載均衡服務
                this.rebalanceService.start();
                /**
                 * 這裡再次呼叫了DefaultMQProducerImpl().start()方法,這TM不死迴圈了嗎?
                 * 不會的,因為他傳遞了false,false再DefaultMQProducerImpl().start()方法裡不會再次呼叫mQClientFactory.start();
                 * 但是這也重複執行了兩次DefaultMQProducerImpl().start()方法裡的其他邏輯,不知道為啥這麼搞,沒看懂。
                 */
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                // 都啟動完成,沒報錯的話,就將狀態改為執行中
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

2.2、總結

  • 啟動例項MQClientAPIImpl,這裡封裝了客戶端與Broker進行通訊的方法。
  • 啟動各種定時任務,與Broker之間的心跳等等。
  • 啟動訊息拉取服務。
  • 啟動負載均衡服務。
  • 啟動預設的Producer服務(重複啟動了,因為客戶端一開始就啟動了這個)。

3、傳送訊息

3.1、new Message

public Message(String topic, byte[] body) {
    this(topic, "", "", 0, body, true);
}

public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
    this.topic = topic;
    this.flag = flag;
    this.body = body;
    if (tags != null && tags.length() > 0)
        this.setTags(tags);
    if (keys != null && keys.length() > 0)
        this.setKeys(keys);
    this.setWaitStoreMsgOK(waitStoreMsgOK);
}
  • 拼湊訊息體:訊息內容、tag、keys、topic等。

3.2、producer.send(msg)

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 引數校驗
    Validators.checkMessage(msg, this);
    // 設定topic
    msg.setTopic(withNamespace(msg.getTopic()));
    // 傳送訊息
    return this.defaultMQProducerImpl.send(msg);
}

3.2.1、sendDefaultImpl

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 檢查Producer上是否是RUNNING狀態
    this.makeSureStateOK();
    // 訊息格式的校驗
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 嘗試獲取topic的路由資訊
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        // 選擇訊息要傳送的佇列
        MessageQueue mq = null;
        // 傳送結果
        SendResult sendResult = null;
        // 自動重試次數,this.defaultMQProducer.getRetryTimesWhenSendFailed()預設為2,如果是同步傳送,預設重試3,否則重試1次
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        for (; times < timesTotal; times++) {
            // 選擇topic的一個queue,然後往這個queue裡發訊息。
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                try {
                    // 真正的發訊息方法
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
      // 同步的,將返回的結果返回,如果返回結果狀態不是成功的,則continue,進入下一次迴圈進行重試。    
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    continue;
                } catch (MQClientException e) {
                    continue;
                } catch (...) {...}
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }
    }
}

4、面試題

面試官:RocketMQ是如何發訊息的?

回答者:首先需要配置好生產者組名、namesrv地址和topic以及要傳送的訊息內容,然後啟動Producer的start()方法,啟動完成後呼叫send()方法進行傳送。

start()方法內部會進行檢查namesrv、生產者組名等引數驗證,然後內部會獲取一個mQClientFactory物件,此物件內包含了所有與Broker進行通訊的api,然後通過mQClientFactory啟動請求響應通道,主要是netty,接下來啟動一些定時任務,比如與broker的心跳等,還會啟動負載均衡服務等,最後都啟動成功的話將服務的狀態標記為RUNNING。

啟動完成後呼叫send()方法發訊息,有三種傳送方式,同步、非同步、oneWay,都大同小異,唯一的區別的就是非同步的多個執行緒池去非同步呼叫傳送請求,而同步則是當前請求執行緒直接同步呼叫的,核心流程都是:

先選擇一個合適的queue來儲存訊息,選擇完後拼湊一個header引數物件,通過netty的形式傳送給broker。

這裡值得注意的是:如果傳送失敗的話他會自動重試,默認同步傳送的次數是3次,也就是失敗後會自動重試2次。

5、設計模式

5.1、單例模式

2.1.1、defaultMQProducerImpl.start()這部分已經提到了,如下程式碼

/*
 * 單例模式,獲取MQClientInstance物件,客戶端例項。也就是Producer所部署的機器例項物件,負責操作的主要物件。
 */
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

深究下看看是哪種單例:

public class MQClientManager {
    // 直接new
    private static MQClientManager instance = new MQClientManager();
    // 私有構造器
    private MQClientManager() {
    }
    // getInstance
    public static MQClientManager getInstance() {
        return instance;
    }
}

嗯...好吧,是個簡單粗暴的餓漢式。

5.2、狀態模式

2.1.1、defaultMQProducerImpl.start()還是這個,這裡面大量的switch..case,其實這是個不規範的狀態模式。先看下狀態模式的定義:

狀態模式允許一個物件在其內部狀態改變時改變它的行為,物件看起來就像是改變了它的類。

再去分析上段程式碼,不就是使用一個成員變數 serviceState 來記錄和管理自身的服務狀態嗎?只是與標準的狀態模式不同的是它沒有使用狀態子類,而是使用switch-case來實現不同狀態下的不同行為的。

5.3、門面模式

先來看下門面模式定義:

門面模式主要的作用是給客戶端提供了一個可以訪問系統的介面,隱藏系統內部的複雜性。

再來看RocketMQ的Producer,典型的是這種:提供了一個可以訪問系統的介面,隱藏系統內部的複雜性的特性。

程式碼層面的話很明顯,我們new的是DefaultMQProducer物件,但內部實際操作的確都是DefaultMQProducerImpl物件。比如原始碼中的start和send方法都是

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer}
 */
public void start() throws MQClientException {
    // DefaultMQProducerImpl的start
    this.defaultMQProducerImpl.start();
}

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer}
 */
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // DefaultMQProducerImpl的send
    return this.defaultMQProducerImpl.send(msg);
}

5.4、總結

吸取大牛寫程式碼的經驗,這種模式再開發需求中都能用到。學原始碼為了什麼?

1:面試吹牛逼

2:學習優秀的程式碼設計思想

6、時序圖

publicvoidstart()throwsMQClientException{
synchronized(this){
//預設為CREATE_JUST狀態
switch(this.serviceState){
caseCREATE_JUST:
//先預設成啟動失敗,等最後完全啟動成功的時候再置為ServiceState.RUNNING
this.serviceState=ServiceState.START_FAILED;

//啟動請求響應通道,核心netty
this.mQClientAPIImpl.start();
/**
*啟動各種定時任務
*1.每隔2分鐘去檢測namesrv的變化
*2.每隔30s從nameserver獲取topic的路由資訊有沒有發生變化,或者說有沒有新的topic路由資訊
*3.每隔30s清除下線的broker
*4.每隔5s持久化所有的消費進度
*5.每隔1分鐘檢測執行緒池大小是否需要調整
*/

this.startScheduledTask();
//啟動拉取訊息服務
this.pullMessageService.start();
//啟動Rebalance負載均衡服務
this.rebalanceService.start();
/**
*這裡再次呼叫了DefaultMQProducerImpl().start()方法,這TM不死迴圈了嗎?
*不會的,因為他傳遞了false,false再DefaultMQProducerImpl().start()方法裡不會再次呼叫mQClientFactory.start();
*但是這也重複執行了兩次DefaultMQProducerImpl().start()方法裡的其他邏輯,不知道為啥這麼搞,沒看懂。
*/

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
//都啟動完成,沒報錯的話,就將狀態改為執行中
this.serviceState=ServiceState.RUNNING;
break;
caseSTART_FAILED:
thrownewMQClientException("TheFactoryobject["+this.getClientId()+"]hasbeencreatedbefore,andfailed.",null);
default:
break;
}
}
}