RocketMQ原理學習--訊息型別
一、叢集訊息與廣播訊息
叢集消費:當使用叢集消費模式時,MQ 認為任意一條訊息只需要被叢集內的任意一個消費者處理即可。
廣播消費:當使用廣播消費模式時,MQ 會將每條訊息推送給叢集內所有註冊過的客戶端,保證訊息至少被每臺機器消費一次。
叢集消費模式:
適用場景&注意事項
- 消費端叢集化部署,每條訊息只需要被處理一次。
- 由於消費進度在服務端維護,可靠性更高。
- 叢集消費模式下,每一條訊息都只會被分發到一臺機器上處理,如果需要被叢集下的每一臺機器都處理,請使用廣播模式。
- 叢集消費模式下,不保證訊息的每一次失敗重投等邏輯都能路由到同一臺機器上,因此處理訊息時不應該做任何確定性假設。
廣播消費模式:
適用場景&注意事項
- 順序訊息暫不支援廣播消費模式。
- 每條訊息都需要被相同邏輯的多臺機器處理。
- 消費進度在客戶端維護,出現重複的概率稍大於叢集模式。
- 廣播模式下,MQ 保證每條訊息至少被每臺客戶端消費一次,但是並不會對消費失敗的訊息進行失敗重投,因此業務方需要關注消費失敗的情況。
- 廣播模式下,第一次啟動時預設從最新訊息消費,客戶端的消費進度是被持久化在客戶端本地的隱藏檔案中,因此不建議刪除該隱藏檔案,否則會丟失部分訊息。
- 廣播模式下,每條訊息都會被大量的客戶端重複處理,因此推薦儘可能使用叢集模式。
- 廣播模式下服務端不維護消費進度,所以 MQ 控制檯不支援訊息堆積查詢和堆積報警功能。
程式碼示例:
設定叢集訊息:consumer.setMessageModel(MessageModel.CLUSTERING);
設定廣播訊息:consumer.setMessageModel(MessageModel.BROADCASTING);
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("localhost:9876"); //叢集消費者 //consumer.setMessageModel(MessageModel.CLUSTERING); //廣播消費者 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
二、普通訊息、事物訊息、順序訊息、延時訊息
RocketMQ 針對不同的業務場景還提供了普通訊息、事物訊息、順序訊息和延時訊息等幾種訊息型別。
1、普通訊息
普通訊息也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒有保證,可能先發送的訊息先消費,也可能先發送的訊息後消費。
舉個簡單例子,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通訊息。
因為不需要保證訊息的順序,所以訊息可以大規模併發地傳送和消費,吞吐量很高,適合大部分場景。
示例:
生產者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart 11" + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消費者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("localhost:9876");
//consumer.setInstanceName("rmq-instance2");
consumer.subscribe("TopicA-test", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
2、事物訊息
MQ 的事務訊息互動流程如下圖所示:
採用2PC提交:
第一階段是:步驟1,2,3。
第二階段是:步驟4,5。
生產者:
public class TransactionProducer {
public static void main(String [] args) throws Exception{
final TransactionMQProducer producer = new TransactionMQProducer("rmq-transaction");
producer.setNamesrvAddr("localhost:9876");
//事務回查最小併發數
producer.setCheckThreadPoolMinSize(5);
//事務回查最大併發數
producer.setCheckThreadPoolMaxSize(20);
//佇列數
producer.setCheckRequestHoldMax(2000);
producer.start();
//伺服器回撥producer,檢查本地事務分支成功還是失敗
producer.setTransactionCheckListener(new TransactionCheckListener() {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
System.out.println("state --" + new String(messageExt.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 2; i++) {
Message msg = new Message("TopicTransaction",
"Transaction" + i,
("Hello RocketMq" + i).getBytes()
);
SendResult sendResult = producer.sendMessageInTransaction(msg, transactionExecuter, "tq");
System.out.println(sendResult);
TimeUnit.MICROSECONDS.sleep(1000);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
}
}));
System.exit(0);
}
}
執行本地事物:
public class TransactionExecuterImpl implements LocalTransactionExecuter {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("msg=" + new String(msg.getBody()));
System.out.println("arg = "+arg);
String tag = msg.getTags();
if (tag.equals("Transaction1")){
//這裡有一個分階段提交的概念
System.out.println("這裡是處理業務邏輯,失敗情況下進行ROLLBACK");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
//return LocalTransactionState.UNKNOW;
}
}
消費者:
public class TransactionConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-transaction");
consumer.setNamesrvAddr("localhost:9876");
//consumer.setInstanceName("rmq-instance2");
consumer.subscribe("TopicTransaction", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
3、順序訊息
有序訊息就是按照一定的先後順序的訊息型別。
舉個例子來說,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序也就是 1、2、3 ,而不會出現普通訊息那樣的 2、1、3 等情況。
那麼有序訊息是如何保證的呢?我們都知道訊息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那麼要保證訊息的有序,勢必這兩步都是要保證有序的,即要保證訊息是按有序傳送到 broker,broker 也是有序將訊息投遞給 consumer,兩個條件必須同時滿足,缺一不可。
進一步還可以將有序訊息分成
- 全域性有序訊息
- 區域性有序訊息
實現原理:由於生產者預設是輪詢獲取MessageQueue佇列(每個Topic預設初始化4個MessageQueue),然後將訊息輪詢傳送到不同的MessageQueue中,訊息者從MessageQueue中獲取資料時很可能是無序的。
區域性有序訊息:將相同順序的訊息傳送到同一個MessageQueue佇列,這樣消費者從佇列中獲取資料肯定是相對有序的。
全域性有序訊息:將所有的訊息傳送到一個MessageQueue佇列,消費者從單個佇列中拉取訊息,訊息有序。
生產者:實現MessageQueueSelector介面,相同順序的訊息獲取同一個MessageQueue
public class OrderProducer {
public static void main(String[] args) throws Exception {
try {
DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 0);
System.out.println(sendResult);
}
for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 1);
System.out.println(sendResult);
}
for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 2);
System.out.println(sendResult);
}
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消費者:設定訊息監聽器為順序訊息監聽器MessageListenerOrderly
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("localhost:9876");
/**
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");
/**
* 實現了MessageListenerOrderly表示一個佇列只會被一個執行緒取到
*,第二個執行緒無法訪問這個佇列
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 設定自動提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",內容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
;
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}
4、延時訊息
延時訊息,簡單來說就是當 producer 將訊息傳送到 broker 後,會延時一定時間後才投遞給 consumer 進行消費。
RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。
這種訊息一般適用於訊息生產和消費之間有時間視窗要求的場景。比如說我們網購時,下單之後是有一個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那麼在訂單建立的時候就會就需要傳送一條延時訊息(延時15分鐘)後投遞給 consumer,consumer 接收訊息後再對訂單的支付狀態進行判斷是否關閉訂單。
設定延時非常簡單,只需要在Message設定對應的延時級別即可
生產者:
public class DelayProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart 11" + i)
.getBytes()// body
);
//1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
// level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推
msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
參考部落格:
https://my.oschina.net/xinxingegeya/blog/1577410
https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.575.42512de7YsiiZ5