2-rocketmq-訊息傳送和接收
quick start
新增依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
生產者
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 生產者組,簡單來說就是多個傳送同一類訊息的生產者稱之為一個生產者組rocketmq支援事務訊息,在傳送事務訊息時,如果事務訊息異常(producer掛了),broker端會來回查事務的狀態,這個時候會根據group名稱來查詢對應的producer來執行相應的回查邏輯。相當於實現了producer的高可用 */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // namesrv地址 多個地址用 ; 隔開 從namesrv上拉取broker資訊 producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { /** * 建立訊息例項,指定topic,tag,訊息內容。tag */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 傳送訊息並獲取傳送結果 同步傳送 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
SendResult中,有一個sendStatus狀態,表示訊息的傳送狀態。一共有四種狀態
- FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設定成
SYNC_FLUSH 才會報這個錯誤) 。 - FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設定成SYNC_MASTER 方式,沒有
在設定時間內完成主從同步。 - SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方
式下,並且Broker 被設定成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。 - SEND OK :表示傳送成功,傳送成功的具體含義,比如訊息是否已經被儲存到磁碟?訊息是否被
同步到了Slave 上?訊息在Slave 上是否被寫入磁碟?需要結合所配置的刷盤策略、主從策略來
定。這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND OK
消費者
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //groupName 將多個consumer分組,提高併發處理能力。需要和MessageModel配合 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // 多個地址 ;分開 獲取broker地址 並定時向broker傳送心跳 可以從master/slave獲取訂閱 consumer.setNamesrvAddr("localhost:9876"); // 兩種訊息模式 BROADCASTING CLUSTERING consumer.setMessageModel(MessageModel.BROADCASTING); //設定consumer第一次啟動從佇列頭部還是尾部開始消費 //如果非第一次啟動,那麼按上一次消費的位置繼續消費(取決於本地的offeset資料) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // topic 可通過tag過濾訊息 * 或 null 代表全部 consumer.subscribe("TopicTest", "*"); /**註冊訊息處理回撥 * MessageListenerConcurrently 普通監聽 * MessageListenerOrderly 順序監聽 */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // todo 訊息處理邏輯 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消費狀態 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動consumer consumer.start(); System.out.printf("Consumer Started.%n"); } }
消費狀態
ConsumeConcurrentlyStatus {
// 消費成功
CONSUME_SUCCESS,
// 使用失敗,稍後嘗試使用
RECONSUME_LATER;
訊息傳送及消費的基本原理
叢集部署,一個master可以有多個slave,一個slave只能有一個master.consumer可以從master獲者slave中訂閱訊息
2m-2s示例:
rocketMQ 沒有實現master選舉(通過配置檔案來指定主從)
當master掛了後 消費者依然能正常消費訊息(slave提供讀服務)
通過groupName實現分割槽,提高消費者的處理能力
消費者
兩種消費者型別
- DefaultMQPushConsumer 由系統控制讀取操作
DefaultMQPushConsumer
自動儲存offset,自動做負載均衡
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//groupName 將多個consumer分組,提高併發處理能力。需要和MessageModel配合
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 多個地址 ;分開
consumer.setNamesrvAddr("localhost:9876");
// 兩種訊息模式 BROADCASTING CLUSTERING
consumer.setMessageModel(MessageModel.BROADCASTING);
//第一次啟動從 offset頭開始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// topic 可通過tag過濾訊息 * 或 null 代表全部
consumer.subscribe("TopicTest", "*");
//註冊訊息處理回撥
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// todo 訊息處理邏輯
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
兩種訊息模式 BROADCASTING CLUSTERING:
1、在 Clustering 模式下,同一個 ConsumerGroup ( GroupName 相同 ) 裡的每個 Consumer 只消費所訂閱訊息 的一部分 內 容, 同一個 ConsumerGroup裡所有的 Consumer 消 費 的內 容合起來才是所訂閱 Topic 內 容 的 整體 ,從而達到負載均衡的目的 (也就是叢集消費)
2、在 Broadcasting 模式下,同一個 ConsumerGroup 裡的每個 Consumer 都能消費到所訂閱 Topic 的全部訊息,也就是一個訊息會被多次分發,被多個Consumer 消費 。(也就是廣播模式)
通過長輪詢的方式獲取訊息
Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新訊息到達就利用現有的連線立刻返回訊息給Consumer。主動權在Consumer
好處是客戶端能充分利用資源,不至於處理不過來
流量控制
DefaultMQPullConsumer
需要自己維護offset,需要通過遍歷MessageQueue獲取訊息
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 獲取分片
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
// 獲取到訊息
case FOUND:
break;
case NO_MATCHED_MSG:
break;
// 沒有新訊息
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
Consumer的啟動、關閉
DefaultMQPushConsumer啟動時不會檢查nameServer地址的正確或者可用性
// 從指定topic中拉取所有訊息佇列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
可以通過上面的方法主動拉取訊息佇列來判斷nameServer的可用性
關閉時呼叫shutdown()即可
DefaultMQPullConsumer關閉或者異常退出時需要將offset儲存起來
才能保證下次啟動時拉取訊息的正確性
consumerGroup:位於同一個consumerGroup中的consumer例項和producerGroup中的各個produer例項承擔的角色類似;同一個group中可以配置多個consumer,可以提高消費端的併發消費能力以及容災
和kafka一樣,多個consumer會對訊息做負載均衡,意味著同一個topic下的不messageQueue會分發給同一個group中的不同consumer
消費端的負載均衡
和kafka一樣,消費端也會針對Message Queue做負載均衡,使得每個消費者能夠合理的消費多個分割槽的訊息。
消費端會通過RebalanceService執行緒,10秒鐘做一次基於topic下的所有佇列負載
-
消費端遍歷自己的所有topic,依次調rebalanceByTopic
-
根據topic獲取此topic下的所有queue
-
選擇一臺broker獲取基於group的所有消費端(有心跳向所有broker註冊客戶端資訊)
-
選擇佇列分配策略例項AllocateMessageQueueStrategy執行分配演算法
什麼時候觸發負載均衡
- 消費者啟動之後
- 消費者數量發生變更
- 每10秒會觸發檢查一次rebalance
分配演算法
RocketMQ提供了6中分割槽的分配演算法
- (AllocateMessageQueueAveragely)平均分配演算法(預設)
- (AllocateMessageQueueAveragelyByCircle)環狀分配訊息佇列
- (AllocateMessageQueueByConfig)按照配置來分配佇列: 根據使用者指定的配置來進行負載
- (AllocateMessageQueueByMachineRoom)按照指定機房來配置佇列
- (AllocateMachineRoomNearby)按照就近機房來配置佇列:
- (AllocateMessageQueueConsistentHash)一致性hash,根據消費者的cid進行
生產者
DefaultMQProducer 預設生產者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// producerGroupName
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// namesrv地址 多個地址用 ; 隔開
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 返回
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
訊息返回狀態:SendResult.sendStatus
1、FLUSH DISK TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策略設定成 SYNC FLUSH 才會報這個錯誤) 。
2、 FLUSH SLAVE TIMEOUT :表示在主備方式下,並且 Broker 被設定成 SYNC MASTER 方式,沒有在設定時間內完成主從同步 。
3、SLAVE NOT AVAILABLE : 這個狀態產生的場景和 FLUSH SLAVETIMEOUT 類似, 表示在主備 方式下,並且 Broker 被設定成 SYNCMASTER ,但是沒有找到被配置成 S lave 的 Broker 。
4、SEND OK :表示傳送成功,傳送成功的具體含義,比如訊息是否已經被儲存到融盤?訊息是否被同步到了 S lave 上?訊息在 S lave 上是否被寫人磁碟?需要結合所配置的刷盤策略、主從策略來定 。 這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是 SEND OK
延遲訊息
通過Message.setDelayTimeLevel ( int level ) 方法設定延遲時間,只支援預設值(1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/7m/8m/9m/1 Om/20m/30m/1 h/2h )。 比如setDelayTimeLevel(3)表示延遲 10s 。
自定義訊息傳送規則
實現MessageQueueSelector介面
三種預設實現:
SelectMessageQueueByHash
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom
自定義訊息傳送可以將訊息傳送到指定的MessageQueue裡
對事物的支援
new TransactionMQProducer("groupName");
設定生產者group,當一個producer掛掉了,訊息會分發到其它producer保證訊息一定會被回查確定
訊息的可靠性原則
只有消費者返回CONSUME_SUCCESS消費成功的才會認為消費成功
返回ConsumeConcurrentlyStatus.RECONSUME_LATER消費失敗會被重試
訊息衰減重試
為了保證訊息肯定至少被消費一次,RocketMQ會把這批訊息重新發回到broker,在延遲的某個時間點
(預設是10秒,業務可設定)後,再次投遞到這個ConsumerGroup。而如果一直這樣重複消費都持續
失敗到一定次數(預設16次),就會投遞到DLQ死信佇列。應用可以監控死信佇列來做人工干預
可以修改broker-a.conf檔案
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重試訊息的處理機制
一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費效能,理論上當嘗試重複次數
達到我們想要的結果時如果還是消費失敗,那麼我們需要將對應的訊息進行記錄,並且結束重複嘗試
consumer.registerMessageListener((MessageListenerConcurrently) (list,
consumeOrderlyContext) -> {
for (MessageExt messageExt : list) {
if(messageExt.getReconsumeTimes()==3) {
//可以將對應的資料儲存到資料庫,以便人工干預
System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} r
eturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
死信佇列
RocketMQ會為每個消費組都設定一個Topic命名為“%DLQ%+consumerGroup"的死信佇列。