RocketMQ系列(二)——應用篇
技術標籤:RocketMQ
本篇以應用角度講解RocketMQ的本地安裝、啟動與簡單配置,以及各主要類的功能及使用方式。更多其它瞭解請參考:
RocketMQ系列(一)——基礎篇
RocketMQ系列(三)——原理篇
一、本地安裝與部署
以windows平臺為例,Linux、Mac OS類似
1、安裝包下載地址
https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
解壓後即可使用,記得配置環境變數 ROCKETMQ_HOME=${yourPath}\rocketmq-all-4.2.0-bin-release以及加入到CLASSPATH
2、啟動
cd %ROCKETMQ_HOME%\bin
start mqnamesrv.cmd # 首先需要啟動NameServer
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true # 啟動broker並設定可以自動建立topic
3、配置檔案
官方預設提供了4中broker的部署方式,分別是單節點、雙主從同步雙寫(2m-2s-sync)、雙主雙從非同步複製(2m-2s-async)和雙主模式(2m-noslave),啟動各個broker時可以在 mqbroker.cmd -c 引數後面跟配置檔案指定部署模式。
每個預設配置項的含義:
配置項 | 含義 |
---|---|
brokerClusterName=DefaultCluster | 叢集名稱,相同的brokerClusterName組成叢集 |
brokerId=0 | 節點id,0表示master,其它表示slave |
brokerName=broker-a | 節點名稱,相同節點名的多個節點為主從關係 |
brokerRole=ASYNC_MASTER | 節點角色,SYNC_MASTER:同步雙寫的master,ASYNC_MASTER:非同步複製的master,SLAVE:從節點 |
deleteWhen=04 | 在每天的幾點刪除已經超過檔案保留時間的commit log |
fileReservedTime=48 | 訊息儲存的時間,小時為單位 |
flushDiskType=ASYNC_FLUSH | 持久化方式,SYNC_FLUSH:同步刷盤,ASYNC_FLUSH:非同步刷盤 |
二、主要類介紹
1、生產者相關
DefaultMQProducer
預設生產者,應用程式通過各種過載的send和sendOneway方法傳送訊息,此外還可以建立topic;一個DefaultMQProducer例項必須且只能屬於某一個producer group,可以向多個topic傳送訊息,一個producer group在同一套配置中也只能有一個producer例項,具體原因在系列三的原理篇會講到。
TransactionMQProducer
事務訊息生產者,繼承自DefaultMQProducer,除了DefaultMQProducer提供的方法外,還可以通過sendMessageInTransaction方法傳送事務訊息,這要求註冊一個事務監聽介面(TransactionListener)用於完成本地事務的執行和執行狀態判斷。
SendCallBack
非同步傳送回撥介面,定義了非同步傳送訊息完成後的執行操作。
public interface SendCallback {
// 傳送完成(注意有別於傳送成功)回撥方法
void onSuccess(final SendResult sendResult);
// 傳送異常回調方法
void onException(final Throwable e);
}
RPCHook
顧名思義,rpc呼叫鉤子,可以在客戶端向服務端(這裡指broker,下同)發起呼叫前(doBeforeRequest)後(doAfterResponse)執行特定操作,比如列印日誌等;同樣也適用於消費者
MessageQueueSelector
訊息佇列選擇器,選擇訊息需要傳送的佇列,可以用於負載均衡和訊息區域性有序場景,如輪詢佇列或根據訂單號取模選擇傳送佇列;不傳預設使用MQFaultFallbackStrategy選擇佇列
public interface MessageQueueSelector {
// 從訊息列表mqs中選擇一個佇列傳送msg訊息
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
TransactionListener
事務監聽介面,用於事務訊息傳送時服務端回撥客戶端以執行本地事務或回查本地事務狀態(LocalTransactionState )
public interface TransactionListener {
// 服務端通知客戶端執行本地事務
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
// 服務端在呼叫executeLocalTransaction超時後,回查本地事務執行狀態
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
MessageQueue
訊息佇列實體類,包含所屬topic,所在的broker name和佇列id
Message
訊息實體類,主要包含所屬topic,應用程式需要傳送的訊息體body,以及Map型別的屬性集合properties
在properties中儲存的屬性大致可以分為三類:
TAGS、KEYS、DELAY等預定義業務屬性,使用者可以通過介面訪問
RETRY_TOPIC、REAL_TOPIC、TRAN_MSG等預定義系統屬性,用於特定功能的實現
使用者自定義屬性,如設定age=8,amount=1000等,用於訊息過濾,見系列一基礎篇的SQL表示式過濾模式
狀態相關
SendResult
同步傳送訊息時會立即返回傳送結果,包含傳送狀態物件SendStatus
SendStatus
服務端返回的訊息投遞結果,對於事務訊息,後三種情況下客戶端會要求服務端回滾訊息
列舉取值 | 解釋 | 說明 |
---|---|---|
SEND_OK | 傳送成功 | 服務端事務訊息提交 |
FLUSH_DISK_TIMEOUT | 刷盤超時 | 服務端事務訊息回滾 |
FLUSH_SLAVE_TIMEOUT | 同步slave超時 | 服務端事務訊息回滾 |
SLAVE_NOT_AVAILABLE | 沒有可用的slave | 服務端事務訊息回滾 |
LocalTransactionState
本地事務狀態,事務監聽介面返回該狀態
列舉取值 | 解釋 |
---|---|
COMMIT_MESSAGE | 服務端提交訊息 |
ROLLBACK_MESSAGE | 服務端回滾訊息 |
UNKNOW | 未決狀態,返回該狀態後服務端會再次回查 |
2、消費者相關
DefaultMQPushConsumer
預設的push方式消費者,通過註冊MessageListener實現類定義消費業務邏輯
DefaultMQPullConsumer
預設的pull方式消費者,可以獲取到某topic下的訊息佇列並主動拉取訊息進行消費
MessageFilter
訊息過濾器介面,自定義訊息過濾規則,哪些訊息可以推送到消費者
public interface MessageFilter {
boolean match(final MessageExt msg, final FilterContext context);
}
AllocateMessageQueueStrategy
訊息佇列分配策略,定義同一個topic下的訊息佇列(注意不是訊息,通常一個訊息佇列只由同一個consumer消費)列表在不同consumer之間如何分配,在建立consumer時可指定。
很明顯,該策略只在叢集模式下才會使用,廣播模式下每個消費者都需要消費所有佇列。
官方提供了6種實現,具體實現策略戳這裡,預設策略是AllocateMessageQueueAveragely,介面定義如下:
public interface AllocateMessageQueueStrategy {
// 返回當前consumer消費的佇列
List<MessageQueue> allocate(
final String consumerGroup, // 消費者組
final String currentCID, // 當前consumer id
final List<MessageQueue> mqAll, // 當前topic下的全部訊息佇列
final List<String> cidAll // 該consumer group下的所有consumer id列表
);
// 返回策略名稱
String getName();
}
MessageListener
訊息監聽介面,有MessageListenerConcurrently和MessageListenerOrderly兩種擴充套件方式
MessageListenerConcurrently
併發訊息監聽介面,可以批量獲取訊息,不保證訊息的接收和消費順序;返回併發消費狀態ConsumeConcurrentlyStatus
public interface MessageListenerConcurrently extends MessageListener {
// 併發消費訊息方法宣告
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context);
}
MessageListenerOrderly
順序訊息監聽介面,單執行緒保證訊息的消費順序;返回順序消費狀態ConsumeOrderlyStatus
public interface MessageListenerOrderly extends MessageListener {
// 順序消費訊息方法宣告
ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeOrderlyContext context);
}
型別/狀態相關
MessageModel
消費者的訊息模式
列舉取值 | 解釋 | 說明 |
---|---|---|
BROADCASTING | 廣播模式 | 訊息偏移量儲存在本地 沒有負載均衡,消費時不存在佇列選擇問題 |
CLUSTERING | 叢集模式 | 訊息偏移量儲存在服務端 push模式下有負載均衡問題,消費時可以指定訊息佇列選擇器 pull模式下可以拉取全部佇列,也可以均衡拉取佇列進行消費 |
ConsumeFromWhere
定義消費者啟動後從何處開始消費
列舉取值 | 解釋 |
---|---|
CONSUME_FROM_LAST_OFFSET | 從最新的偏移量開始消費 |
CONSUME_FROM_FIRST_OFFSET | 從佇列頭開始消費 |
CONSUME_FROM_TIMESTAMP | 從某個時刻開始消費 |
ConsumeConcurrentlyStatus
併發消費狀態
列舉取值 | 解釋 |
---|---|
CONSUME_SUCCESS | 消費成功 |
ECONSUME_LATER | 消費失敗,會重新投遞到%RETRY%+${consumergroup}佇列 |
ConsumeOrderlyStatus
順序消費狀態
列舉取值 | 解釋 |
---|---|
SUCCESS | 消費成功 |
SUSPEND_CURRENT_QUEUE_A_MOMENT | 消費失敗,稍後會重新消費 |
PullResult
拉取訊息返回結果,包含拉取狀態PullStatus
PullStatus
拉取結果
列舉取值 | 解釋 |
---|---|
FOUND | 拉取成功 |
NO_NEW_MSG | 沒有新訊息 |
NO_MATCHED_MSG | 沒有匹配的訊息 |
OFFSET_ILLEGAL | 偏移量不合法 |
三、生產者
1、DefaultMQProducer
初始化並啟動
建立並啟動DefaultMQProducer
private DefaultMQProducer createDefaultProducer() throws MQClientException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(defaultProducerGroup, new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
// TODO 向服務端發起請求前執行操作
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
// TODO 服務端發起請求返回後執行操作
}
});
defaultMQProducer.setNamesrvAddr(namesrvAddr);
// 預設佇列數量
defaultMQProducer.setDefaultTopicQueueNums(2);
// 傳送超時時間
defaultMQProducer.setSendMsgTimeout(100);
// 傳送失敗最大重試次數
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.start();
return defaultMQProducer;
}
傳送訊息
// 單條訊息傳送
SendResult sendResult = defaultMQProducer.send(message);
// 批量訊息傳送
SendResult sendResult = defaultMQProducer.send(Lists.newArrayList(message));
// 設定傳送超時時間
SendResult sendResult = defaultMQProducer.send(message, timeout);
// 指定訊息佇列
SendResult sendResult = defaultMQProducer.send(message, messageQueue);
// 指定訊息佇列選擇器,可以實現相關訊息的順序傳送
SendResult sendResult = defaultMQProducer.send(message, messageQueueSelector, null);
// 非同步傳送並指定回撥介面
defaultMQProducer.send(message, sendCallback);
// 單向傳送
defaultMQProducer.sendOneway(message);
// 單向傳送並指定訊息佇列
defaultMQProducer.sendOneway(message, messageQueue);
// 單向傳送並指定訊息佇列選擇器
defaultMQProducer.sendOneway(message, messageQueueSelector, key);
……
以上傳送方式的組合
2、TransactionMQProducer
初始化並啟動
建立並啟動TransactionMQProducer
private TransactionMQProducer createTransactionProducer() throws MQClientException {
TransactionMQProducer transactionMQProducer = new TransactionMQProducer(transactionProducerGroup);
transactionMQProducer.setNamesrvAddr(namesrvAddr);
transactionMQProducer.setRetryTimesWhenSendFailed(2);
transactionMQProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// TODO 執行本地事務,成功返回COMMIT_MESSAGE,失敗返回ROLLBACK_MESSAGE,未決返回UNKNOW
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// TODO 查詢本地事務狀態,返回結果同上
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionMQProducer.start();
return transactionMQProducer;
}
傳送訊息
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, key);
四、消費者
1、PushConsumer
初始化與啟動
public DefaultMQPushConsumer createConcurrentlyPushConsumer() throws Exception {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(pushConsumerGroup);
pushConsumer.setNamesrvAddr(namesrvAddr);
pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 過濾方式一、TAG方式
pushConsumer.subscribe(topic1, "TAG-A || TAG-B*");
// 過濾方式二、TSQL方式
pushConsumer.subscribe(topic2, MessageSelector.bySql("amount between 500 and 1000"));
// 過濾方式三、類過濾方式
/// 注意:由於是直接將filter檔案直接傳送到broker,因此檔案中不能有非RocketMQ核心包之外的外部依賴
String filterClassSource = MixAll.file2String("D:\\workspace\\……\\rocketmq\\impl\\MessageFilterImpl.java");
String fullName = MessageFilterImpl.class.getCanonicalName();
pushConsumer.subscribe(topic3, fullName, filterClassSource);
// 叢集模式,同一ConsumerGroup中只有一個消費者能消費
pushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 消費執行緒池最小執行緒數
pushConsumer.setConsumeThreadMin(2);
// 消費執行緒池最大執行緒數
pushConsumer.setConsumeThreadMax(5);
// 批量消費,一次消費多少條訊息,也即messageListener方法中msgs的size上限
pushConsumer.setConsumeMessageBatchMaxSize(10);
// 批量拉訊息,一次最多拉多少條。問題:為什麼push模式下會有拉取訊息的操作?
pushConsumer.setPullBatchSize(10);
// 併發訊息監聽介面
pushConsumer.registerMessageListener(concurrentlyListener);
// 順序訊息監聽介面
// pushConsumer.registerMessageListener(orderlyListener);
// 設定訊息佇列分配策略
pushConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
pushConsumer.start();
return pushConsumer;
}
消費
Consumer消費訊息是通過自定義MessageListener實現的。
1、併發消費,不保證實際的消費順序
public class ConcurrentlyListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
msgs.forEach(msg -> {
// TODO 消費業務邏輯
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
2、順序消費,同一佇列的訊息只有一個執行緒消費以保證順序,需要配合producer的MessageQueueSelector使用
public class OrderlyListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
msgs.forEach(msg -> {
// TODO 消費業務邏輯
});
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
2、PullConsumer
初始化與啟動
public DefaultMQPullConsumer createPullConsumer() throws Exception {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(pullConsumerGroup);
pullConsumer.setNamesrvAddr(namesrvAddr);
pullConsumer.setMessageModel(MessageModel.CLUSTERING);
pullConsumer.start();
return pullConsumer;
}
消費
private void consume() throws Exception {
// 獲取topic下的全部佇列
Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(topic);
// 均衡獲取topic下的部分佇列
// Set<MessageQueue> queues = pullConsumer.fetchMessageQueuesInBalance(topic);
for (MessageQueue queue : queues) {
SINGLE_MQ:
for (; ; ) {
PullResult pullResult = pullConsumer.pullBlockIfNotFound(queue, "", getOffset(queue) /* 獲取佇列的消費位移 */, 32);
PullStatus pullStatus = pullResult.getPullStatus();
// 本地記錄當前queue的消費位移
setOffset(queue, pullResult.getNextBeginOffset());
switch (pullStatus) {
case FOUND:
List<MessageExt> messageExts = pullResult.getMsgFoundList();
log.info("pull success, msg size: ", messageExts.size());
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
// TODO 業務處理邏輯
}
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
}
}
}
【參考資料】
https://blog.csdn.net/yewandemty/article/details/81989695
https://www.jianshu.com/p/75badea5ac1e