RocketMQ(四)producer生產訊息原始碼剖析
一、Demo
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * Description: * * @author TongWei.Chen 2020-06-21 11:32:58 */ public class ProducerDemo { public static void main(String[] args) throwsException { DefaultMQProducer producer = new DefaultMQProducer("my-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); producer.start(); Message msg = new Message("myTopic001", "hello world".getBytes()); SendResult result = producer.send(msg); System.out.println("傳送訊息成功!result is : " + result); } }
二、原始碼剖析
1、準備工作
1.1、new DefaultMQProducer()
public DefaultMQProducer(final String producerGroup) { this(null, producerGroup, null); } public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { // null this.namespace = namespace;// my-producer this.producerGroup = producerGroup; // new DefaultMQProducerImpl(this, null); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); }
- 給producerGroup賦值
- new DefaultMQProducerImpl()
1.2、setNamesrvAddr()
/** * {@link org.apache.rocketmq.client.ClientConfig} */ public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; }
- 給namesrv賦值
2、啟動
2.1、start()
@Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); }
2.1.1、defaultMQProducerImpl.start()
private ServiceState serviceState = ServiceState.CREATE_JUST; public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { // 預設為CREATE_JUST狀態 case CREATE_JUST: // 先預設成啟動失敗,等最後完全啟動成功的時候再置為ServiceState.RUNNING this.serviceState = ServiceState.START_FAILED; /** * 檢查配置,比如group有沒有寫,是不是預設的那個名字,長度是不是超出限制了,等等一系列驗證。 */ this.checkConfig(); /* * 單例模式,獲取MQClientInstance物件,客戶端例項。也就是Producer所部署的機器例項物件,負責操作的主要物件。 */ this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); /** * 註冊producer,其實就是往producerTable map裡仍key-value * private final ConcurrentMap<String, MQProducerInner> producerTable = * new ConcurrentHashMap<String, MQProducerInner>(); * producerTable.putIfAbsent("my-producer", DefaultMQProducerImpl); */ boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 將topic資訊存到topicPublishInfoTable這個map裡 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { // 真正的啟動核心類 mQClientFactory.start(); } // 都啟動完成,沒報錯的話,就將狀態改為執行中 this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 每隔1s掃描過期的請求 RequestFutureTable.scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000); }
2.1.2、mQClientFactory.start()
public void start() throws MQClientException { synchronized (this) { // 預設為CREATE_JUST狀態 switch (this.serviceState) { case CREATE_JUST: // 先預設成啟動失敗,等最後完全啟動成功的時候再置為ServiceState.RUNNING this.serviceState = ServiceState.START_FAILED; // 啟動請求響應通道,核心netty this.mQClientAPIImpl.start(); /** * 啟動各種定時任務 * 1.每隔2分鐘去檢測namesrv的變化 * 2.每隔30s從nameserver獲取topic的路由資訊有沒有發生變化,或者說有沒有新的topic路由資訊 * 3.每隔30s清除下線的broker * 4.每隔5s持久化所有的消費進度 * 5.每隔1分鐘檢測執行緒池大小是否需要調整 */ this.startScheduledTask(); // 啟動拉取訊息服務 this.pullMessageService.start(); // 啟動Rebalance負載均衡服務 this.rebalanceService.start(); /** * 這裡再次呼叫了DefaultMQProducerImpl().start()方法,這TM不死迴圈了嗎? * 不會的,因為他傳遞了false,false再DefaultMQProducerImpl().start()方法裡不會再次呼叫mQClientFactory.start(); * 但是這也重複執行了兩次DefaultMQProducerImpl().start()方法裡的其他邏輯,不知道為啥這麼搞,沒看懂。 */ this.defaultMQProducer.getDefaultMQProducerImpl().start(false); // 都啟動完成,沒報錯的話,就將狀態改為執行中 this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
2.2、總結
- 啟動例項MQClientAPIImpl,這裡封裝了客戶端與Broker進行通訊的方法。
- 啟動各種定時任務,與Broker之間的心跳等等。
- 啟動訊息拉取服務。
- 啟動負載均衡服務。
- 啟動預設的Producer服務(重複啟動了,因為客戶端一開始就啟動了這個)。
3、傳送訊息
3.1、new Message
public Message(String topic, byte[] body) { this(topic, "", "", 0, body, true); } public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { this.topic = topic; this.flag = flag; this.body = body; if (tags != null && tags.length() > 0) this.setTags(tags); if (keys != null && keys.length() > 0) this.setKeys(keys); this.setWaitStoreMsgOK(waitStoreMsgOK); }
- 拼湊訊息體:訊息內容、tag、keys、topic等。
3.2、producer.send(msg)
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 引數校驗 Validators.checkMessage(msg, this); // 設定topic msg.setTopic(withNamespace(msg.getTopic())); // 傳送訊息 return this.defaultMQProducerImpl.send(msg); }
3.2.1、sendDefaultImpl
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 檢查Producer上是否是RUNNING狀態 this.makeSureStateOK(); // 訊息格式的校驗 Validators.checkMessage(msg, this.defaultMQProducer); // 嘗試獲取topic的路由資訊 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { // 選擇訊息要傳送的佇列 MessageQueue mq = null; // 傳送結果 SendResult sendResult = null; // 自動重試次數,this.defaultMQProducer.getRetryTimesWhenSendFailed()預設為2,如果是同步傳送,預設重試3,否則重試1次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { // 選擇topic的一個queue,然後往這個queue裡發訊息。 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; try { // 真正的發訊息方法 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; // 同步的,將返回的結果返回,如果返回結果狀態不是成功的,則continue,進入下一次迴圈進行重試。 case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { continue; } catch (MQClientException e) { continue; } catch (...) {...} } else { break; } } if (sendResult != null) { return sendResult; } } }
4、面試題
面試官:RocketMQ是如何發訊息的?
回答者:首先需要配置好生產者組名、namesrv地址和topic以及要傳送的訊息內容,然後啟動Producer的start()方法,啟動完成後呼叫send()方法進行傳送。
start()方法內部會進行檢查namesrv、生產者組名等引數驗證,然後內部會獲取一個mQClientFactory物件,此物件內包含了所有與Broker進行通訊的api,然後通過mQClientFactory啟動請求響應通道,主要是netty,接下來啟動一些定時任務,比如與broker的心跳等,還會啟動負載均衡服務等,最後都啟動成功的話將服務的狀態標記為RUNNING。
啟動完成後呼叫send()方法發訊息,有三種傳送方式,同步、非同步、oneWay,都大同小異,唯一的區別的就是非同步的多個執行緒池去非同步呼叫傳送請求,而同步則是當前請求執行緒直接同步呼叫的,核心流程都是:
先選擇一個合適的queue來儲存訊息,選擇完後拼湊一個header引數物件,通過netty的形式傳送給broker。
這裡值得注意的是:如果傳送失敗的話他會自動重試,默認同步傳送的次數是3次,也就是失敗後會自動重試2次。
5、設計模式
5.1、單例模式
2.1.1、defaultMQProducerImpl.start()
這部分已經提到了,如下程式碼
/* * 單例模式,獲取MQClientInstance物件,客戶端例項。也就是Producer所部署的機器例項物件,負責操作的主要物件。 */ this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
深究下看看是哪種單例:
public class MQClientManager { // 直接new private static MQClientManager instance = new MQClientManager(); // 私有構造器 private MQClientManager() { } // getInstance public static MQClientManager getInstance() { return instance; } }
嗯...好吧,是個簡單粗暴的餓漢式。
5.2、狀態模式
2.1.1、defaultMQProducerImpl.start()
還是這個,這裡面大量的switch..case
,其實這是個不規範的狀態模式。先看下狀態模式的定義:
狀態模式允許一個物件在其內部狀態改變時改變它的行為,物件看起來就像是改變了它的類。
再去分析上段程式碼,不就是使用一個成員變數 serviceState 來記錄和管理自身的服務狀態嗎?只是與標準的狀態模式不同的是它沒有使用狀態子類,而是使用switch-case
來實現不同狀態下的不同行為的。
5.3、門面模式
先來看下門面模式定義:
門面模式主要的作用是給客戶端提供了一個可以訪問系統的介面,隱藏系統內部的複雜性。
再來看RocketMQ的Producer,典型的是這種:提供了一個可以訪問系統的介面,隱藏系統內部的複雜性的特性。
程式碼層面的話很明顯,我們new的是DefaultMQProducer
物件,但內部實際操作的確都是DefaultMQProducerImpl
物件。比如原始碼中的start和send方法都是
/** * {@link org.apache.rocketmq.client.producer.DefaultMQProducer} */ public void start() throws MQClientException { // DefaultMQProducerImpl的start this.defaultMQProducerImpl.start(); } /** * {@link org.apache.rocketmq.client.producer.DefaultMQProducer} */ public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // DefaultMQProducerImpl的send return this.defaultMQProducerImpl.send(msg); }
5.4、總結
吸取大牛寫程式碼的經驗,這種模式再開發需求中都能用到。學原始碼為了什麼?
1:面試吹牛逼
2:學習優秀的程式碼設計思想
6、時序圖
publicvoidstart()throwsMQClientException{
synchronized(this){
//預設為CREATE_JUST狀態
switch(this.serviceState){
caseCREATE_JUST:
//先預設成啟動失敗,等最後完全啟動成功的時候再置為ServiceState.RUNNING
this.serviceState=ServiceState.START_FAILED;
//啟動請求響應通道,核心netty
this.mQClientAPIImpl.start();
/**
*啟動各種定時任務
*1.每隔2分鐘去檢測namesrv的變化
*2.每隔30s從nameserver獲取topic的路由資訊有沒有發生變化,或者說有沒有新的topic路由資訊
*3.每隔30s清除下線的broker
*4.每隔5s持久化所有的消費進度
*5.每隔1分鐘檢測執行緒池大小是否需要調整
*/
this.startScheduledTask();
//啟動拉取訊息服務
this.pullMessageService.start();
//啟動Rebalance負載均衡服務
this.rebalanceService.start();
/**
*這裡再次呼叫了DefaultMQProducerImpl().start()方法,這TM不死迴圈了嗎?
*不會的,因為他傳遞了false,false再DefaultMQProducerImpl().start()方法裡不會再次呼叫mQClientFactory.start();
*但是這也重複執行了兩次DefaultMQProducerImpl().start()方法裡的其他邏輯,不知道為啥這麼搞,沒看懂。
*/
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
//都啟動完成,沒報錯的話,就將狀態改為執行中
this.serviceState=ServiceState.RUNNING;
break;
caseSTART_FAILED:
thrownewMQClientException("TheFactoryobject["+this.getClientId()+"]hasbeencreatedbefore,andfailed.",null);
default:
break;
}
}
}