跟我學RocketMQ之訊息消費原始碼解析(1)
本文我們接著分析一下RocektMQ實現訊息消費的原始碼細節,這部分的內容較多,因此拆分為幾個章節分別進行講解。
本章節重點講解DefaultMQPushConsumer的程式碼邏輯。
DefaultMQPushConsumer使用樣例
按照慣例還是先看一下DefaultMQPushConsumer的使用樣例。
@PostConstruct
public void init() {
defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
// 從頭開始消費
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消費模式:叢集模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 註冊監聽器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 訂閱所有訊息
try {
defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC","*");
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("[訂單結果通知訊息消費者]--NotifySendConsumer載入異常!",e);
}
LOGGER.info("[訂單結果通知訊息消費者]--NotifySendConsumer載入完成!");
}複製程式碼
初始化過程中需要呼叫registerMessageListener將具體的消費實現Listener注入。
@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {複製程式碼
...省略部分程式碼...複製程式碼
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {複製程式碼
try {
for (MessageExt msg : msgs) {
// 訊息解碼
String message = new String(msg.getBody());
// 消費次數
int reconsumeTimes = msg.getReconsumeTimes();
String msgId = msg.getMsgId();
String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;複製程式碼
LOGGER.info("[通知傳送訊息消費者]-OrderNotifySendProducer-接收到訊息,message={},{}",message,logSuffix);
// 請求組裝
OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
protocol.decode(message);
// 引數加簽,獲取使用者privatekey
String privateKey = protocol.getPrivateKey();
String notifyUrl = protocol.getMerchantNotifyUrl();
String purseId = protocol.getPurseId();
ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
.setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
.setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
.setPlat_orderid(protocol.getOrderId())
.setSign(chargeNotifyRequest.sign(privateKey));
LOGGER.info("[通知傳送訊息消費者]-OrderNotifySendProducer-訂單結果通知入參:{},chargeNotifyRequest.toString(),logSuffix);
// 通知傳送
return sendNotifyByPost(reconsumeTimes,logSuffix,protocol,notifyUrl,purseId,chargeNotifyRequest);
}
} catch (Exception e) {
LOGGER.error("[通知傳送訊息消費者]消費異常,e={}",LogExceptionWapper.getStackTrace(e));
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}複製程式碼
上面就是一個較為標準的在spring框架中使用RocektMQ的DefaultMQPushConsumer進行消費的主流程。
接下來我們重點分析一下原始碼實現。
初始化DefaultMQPushConsumer
首先看一下DefaultMQPushConsumer的初始化過程。
進入DefaultMQPushConsumer.java類,檢視構造方法:
public DefaultMQPushConsumer(final String consumerGroup) {
this(null,consumerGroup,null,new AllocateMessageQueueAveragely());
}複製程式碼
呼叫了它的同名構造,採用AllocateMessageQueueAveragely策略(平均雜湊佇列演演算法)
public DefaultMQPushConsumer(final String namespace,final String consumerGroup,RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,rpcHook);
}複製程式碼
可以看到實際初始化是通過DefaultMQPushConsumerImpl實現的,DefaultMQPushConsumer持有一個defaultMQPushConsumerImpl的引用。
[DefaultMQPushConsumerImpl.java]
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer,RPCHook rpcHook) {
// 初始化DefaultMQPushConsumerImpl,將defaultMQPushConsumer的實際引用傳入
this.defaultMQPushConsumer = defaultMQPushConsumer;
// 傳入rpcHook並指向本類的引用
this.rpcHook = rpcHook;
}
複製程式碼
註冊消費監聽MessageListener
我們接著看一下注冊消費監聽器的流程。
消費監聽介面MessageListener有兩個具體的實現,分別為
MessageListenerConcurrently -- 並行消費監聽
MessageListenerOrderly -- 順序消費監聽複製程式碼
本文以MessageListenerConcurrently為主要講解的物件。
檢視MessageListenerConcurrently的註冊過程。
@Override
public void registerMessageListener(
MessageListenerConcurrently messageListener) {
// 將實現指向本類引用
this.messageListener = messageListener;
// 進行真實註冊
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}複製程式碼
接著看defaultMQPushConsumerImpl.registerMessageListener
DefaultMQPushConsumerImpl.java
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}複製程式碼
可以看到DefaultMQPushConsumerImpl將真實的messageListener實現指向它本類的messageListener引用。
訂閱topic
接著看一下訂閱topic的主流程。
topic訂閱主要通過方法subscribe實現,首先看一下DefaultMQPushConsumer的subscribe實現
@Override
public void subscribe(String topic,String subExpression)
throws MQClientException {
this.defaultMQPushConsumerImpl
.subscribe(withNamespace(topic),subExpression);
}複製程式碼
可以看到是呼叫了DefaultMQPushConsumerImpl的subscribe方法。
public void subscribe(String topic,String subExpression) throws MQClientException {
try {
// 構建主題的訂閱資料,預設為叢集消費
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic,subExpression);
// 將topic的訂閱資料進行儲存
this.rebalanceImpl.getSubscriptionInner().put(topic,subscriptionData);
if (this.mQClientFactory != null) {
// 如果MQClientInstance不為空,則向所有的broker傳送心跳包,加鎖
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception",e);
}
}複製程式碼
看一下buildSubscriptionData程式碼邏輯
[FilterAPI.java]
public static SubscriptionData buildSubscriptionData(final String consumerGroup,String topic,String subString) throws Exception {
// 構造一個SubscriptionData實體,設定topic、表示式(tag)
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);複製程式碼
// 如果tag為空或者為"*",統一設定為"*",即訂閱所有訊息
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// tag不為空,則先按照‘|’進行分割
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
// 遍歷tag表示式陣列
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// 將每個tag的值設定到tagSet中
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
// tag解析異常
throw new Exception("subString split error");
}
}
return subscriptionData;
}複製程式碼
看一下sendHeartbeatToAllBrokerWithLock程式碼邏輯
[MQClientInstance.java]
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 傳送心跳包
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception",e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat,but failed.");
}
}複製程式碼
可以看到,同步傳送心跳包給所有的broker,而該過程是通過RemotingClient統一實現的,通過呼叫RemotingClient.invokeSync實現心跳包的傳送,底層是通過Netty實現的。具體細節本文不進行展開。
啟動消費客戶端
上述初始化流程執行完畢之後,通過start()方法啟動消費客戶端。
@Override
public void start() throws MQClientException {
// 設定消費者組
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(),this.consumerGroup));
// 啟動消費客戶端
this.defaultMQPushConsumerImpl.start();
// trace處理邏輯
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(),this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ",e);
}
}
}複製程式碼
關於trace的處理邏輯,本文不再展開,感興趣的同學可以移步 跟我學RocketMQ之訊息軌跡實戰與原始碼分析
接著看defaultMQPushConsumerImpl.start()方法邏輯
[DefaultMQPushConsumerImpl.java]
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={},isUnitMode={}",this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(),this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;複製程式碼
首次啟動後,執行配置檢查,該方法為前置校驗方法,主要進行消費屬性校驗。
this.checkConfig();複製程式碼
將訂閱關係配置資訊進行復制
this.copySubscription();複製程式碼
如果當前為叢集消費模式,修改例項名為pid
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}複製程式碼
建立一個新的MQClientInstance例項,如果已經存在直接使用該存在的MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);複製程式碼
為消費者負載均衡實現rebalanceImpl設定屬性
// 設定消費者組
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 設定消費模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 設定佇列分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 設定當前的MQClientInstance例項
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
複製程式碼
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,isUnitMode());
// 註冊訊息過濾鉤子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);複製程式碼
處理offset儲存方式
// offsetStore不為空則使用當前的offsetStore方式
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 否則根據消費方式選擇具體的offsetStore方式儲存offset
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是廣播方式,則使用本地儲存方式
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
break;
// 如果是叢集方式,則使用遠端broker儲存方式儲存offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 載入當前的offset
this.offsetStore.load();複製程式碼
根據MessageListener的具體實現方式選取具體的訊息拉取執行緒實現。
// 如果是MessageListenerOrderly順序消費介面實現
// 訊息消費服務選擇:ConsumeMessageOrderlyService(順序訊息消費服務)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,(MessageListenerOrderly) this.getMessageListenerInner());
}
// 如果是MessageListenerConcurrently並行訊息消費介面實現
// 訊息消費服務選擇:ConsumeMessageConcurrentlyService(並行訊息消費服務)
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently) this.getMessageListenerInner());
}複製程式碼
選擇並初始化完成具體的訊息消費服務之後,啟動訊息消費服務。consumeMessageService主要負責對訊息進行消費,它的內部維護了一個執行緒池。
// 啟動訊息消費服務
this.consumeMessageService.start();複製程式碼
接著向MQClientInstance註冊消費者,並啟動MQClientInstance。這裡再次強調
一個JVM中所有消費者、生產者持有同一個MQClientInstance,且MQClientInstance只會啟動一次
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}複製程式碼
mQClientFactory.start();
log.info("the consumer [{}] start OK.",this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;複製程式碼
如果MQClientInstance已經啟動,或者已經關閉,或者啟動失敗,重複呼叫start會報錯。這裡也能直觀的反映出:MQClientInstance的啟動只有一次
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK,maybe started once,"
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);
default:
break;
}複製程式碼
啟動完成執行後續收尾工作
// 訂閱關係改變,更新Nameserver的訂閱關係表
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 檢查客戶端狀態
this.mQClientFactory.checkClientInBroker();
// 傳送心跳包
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 喚醒執行消費者負載均衡
this.mQClientFactory.rebalanceImmediately();
}
複製程式碼
copySubscription(),訊息重試topic處理邏輯
消費者啟動流程較為重要,我們接著對其中的重點方法展開講解。這部分內容可以暫時跳過,不影響對主流程的把控。
我們研究一下copySubscription方法的實現細節。
[DefaultMQPushConsumerImpl.java]
private void copySubscription() throws MQClientException {
try {複製程式碼
// 首先獲取訂閱資訊
Map<String,String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String,String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),subString);
this.rebalanceImpl.getSubscriptionInner().put(topic,subscriptionData);
}
}複製程式碼
// 為defaultMQPushConsumer設定具體的MessageListener實現
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}複製程式碼
根據消費型別選擇是否進行重試topic訂閱
switch (this.defaultMQPushConsumer.getMessageModel()) {複製程式碼
// 如果是廣播消費模式,則不進行任何處理,即無重試
case BROADCASTING:
break;複製程式碼
// 如果是叢集消費模式,訂閱重試主題訊息
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic,SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic,subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception",e);
}
}複製程式碼
如果是叢集消費模式,會訂閱重試主題訊息
獲取重試topic,規則為 RETRYGROUPTOPIC_PREFIX + consumerGroup,即:"%RETRY%"+消費組名 ;
為重試topic設定訂閱關係,訂閱所有的訊息;
消費者啟動的時候會自動訂閱該重試主題,並參與該topic的訊息佇列負載過程。
小結
到此,我們就DefaultMQPushConsumer的初始化、啟動、校驗以及topic訂閱、重試等程式碼實現細節進行了較為詳細的講解。
下一章節,我將帶領讀者對訊息消費執行緒 consumeMessageService 的實現進行分析,我們下篇文章見。
版權宣告:
原創不易,洗文可恥。除非註明,本博文章均為原創,轉載請以連結形式標明本文地址。