RocketMq——順序消費和事務
RocketMQ不遵循JMS規範,自己有一套自定義的機制,使用訂閱主題的方式傳送和接收任務,支援廣播和叢集兩種消費模式。
叢集模式:設定消費端物件屬性:MessageModel.CLUSTERING,這種方式就可以達到ActiveMQ水平擴充套件負載均衡消費訊息的實現。比較特殊的是這種行為可以支援先發送資料(生產端先發送到MQ),消費端訂閱主題發生在生產端之後也可以收到資料,比較靈活。
廣播模式:設定消費端物件屬性:MessageModel.BROADCASTING,相當於生產端傳送資料到MQ,多個消費端都可以獲得資料。
在RocketMQ裡有個很重要的概念,就是GroupName,無論是生產端還是消費端,都必須指定一個GroupName,這個組名稱是維護在應用系統級別上。
比如生產端指定一個ProduccerGroupName,這個名稱需要由應用系統來保證唯一性,一類Producer集合的名稱,這類Producer通常傳送一類訊息,且傳送邏輯一致。消費端同理。
Topic主題,每個主題表示一個邏輯上的儲存概念,而在MQ上,會有著與之對應的多個Queue佇列,這個是物理儲存的概念。
RocketMQ提供了三種不同的producer:
1.NomalProducer 普通
2.OrderProducer 順序
3.TransactionProducer 事務
1.普通模式:使用傳統的send傳送訊息,不能保證訊息的順序一致性。
2.順序模式:可以嚴格的保證訊息的順序執行。遵循全域性順序的時候使用一個queue,區域性順序使用多個queue並行消費。
3.事務模式:支援事務方式對訊息進行提交處理,在rocket裡事務分兩個階段。
第一個階段把訊息傳給MQ,只不過消費端不可見,但資料其實已經在Broker上了。
第二個階段為本地訊息回撥處理,如果都成功返回COMMIT_MESSAGE,則在broker上的資料對消費端可見,失敗則為ROLLBACK_MESSAGE,消費端不可見。
順序消費
//如果使用順序消費,則必須自己實現MessageQueueSelector,保證訊息進入同一個佇列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> msgs, Message msg, Object arg) {
Integer id = (Integer)arg;
return msgs.get(id);
}
}, 0);
broker只保證訊息是順序傳送到消費端,但若消費端是多執行緒的,可能收到的第二個訊息會比第一個訊息處理得更快。
//messageListenerOrderly 保證順序消費,消費端接收的是同一個佇列的訊息,避免多執行緒時順序錯亂
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> arg0, ConsumeOrderlyContext arg1) {
return null;
}
});
consumer開啟多執行緒,只需設定consumer的執行緒數。
consumer.setConsumeThreadMax(10);
consumer.setConsumeThreadMin(10);
事務
生產端先將憑證訊息傳送到broker伺服器上,憑證訊息對消費端不可見;再回調執行本地事務,若執行成功則返回COMMIT,broker再將憑證訊息對消費端可見,若失敗返回ROLLBACK。
Producer:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String group_name = "transaction_producer";
final TransactionMQProducer producer = new TransactionMQProducer(group_name);
producer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");
producer.setCheckRequestHoldMax(200);
producer.setCheckThreadPoolMaxSize(20);
producer.setCheckThreadPoolMinSize(5);
producer.start();
//伺服器回撥producer,檢查本地事務分支成功還是失敗
producer.setTransactionCheckListener(new TransactionCheckListener() {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("state --" + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for(int i=0; i<2 ; i++) {
try {
Message msg = new Message("TopicTransaction","transaction" + i,"key",("hello " + i).getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, "tq");
System.out.println(sendResult);
}catch(Exception e) {
e.printStackTrace();
}
}
Thread.sleep(3000);
producer.shutdown();
}
}
TransactionExecuterImpl:
/*
* 執行本地事務,由客戶端回撥
*/
public class TransactionExecuterImpl implements LocalTransactionExecuter {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("msg :" + new String(msg.getBody()));
System.out.println("arg :" + arg);
String tag = msg.getTags();
if(tag.equals("transaction1")) {
//這裡有一個分階段提交任務概念
System.out.println("這裡處理業務邏輯,如操作資料庫,失敗情況下進行ROLLBACK");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
//return LocalTransactionState.UNKNOW;
}
}
Consumer:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
//設定consumer第一次啟動是從佇列頭部開始還是尾部開始消費,若非第一次啟動,那麼按照上次消費的位置繼續消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTransaction", "*");
//批量消費,一次消費多少條訊息,預設為1條,最大情況能拿多少條不代表每次能拿這麼多條
//consumer.setConsumeMessageBatchMaxSize(3);
//messageListenerOrderly 保證順序消費,消費端接收的是同一個佇列的訊息,避免多執行緒時順序錯亂
/*consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> arg0, ConsumeOrderlyContext arg1) {
return null;
}
});*/
consumer.setConsumeThreadMax(10);
consumer.setConsumeThreadMin(10);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//System.out.println(Thread.currentThread().getName() + "Receive: " + msgs);
//獲取一次性消費多少條訊息
//System.out.println("訊息條數 : " + msgs.size());
MessageExt msg1 = null;
try {
for(MessageExt msg : msgs) {
msg1 = msg;
String topic = msg.getTopic();
String msgbody = new String(msg.getBody(),"utf-8");
String tag = msg.getTags();
System.out.println("收到訊息: " + "topic:" + topic + " tags:" + tag + " msg:" + msgbody);
}
} catch (Exception e) {
e.printStackTrace();
//若已經重試了5次則不再重試
if(msg1.getReconsumeTimes() == 5) {
//此處記錄日誌操作。。。
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");
consumer.start();
System.out.println("Consumer started...");
}
}
Producer console:
msg :hello 0
arg :tq
SendResult [sendStatus=SEND_OK, msgId=C0A8000200002A9F0000000000003B1C, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=0], queueOffset=0]
msg :hello 1
arg :tq
這裡處理業務邏輯,如操作資料庫,失敗情況下進行ROLLBACK
SendResult [sendStatus=SEND_OK, msgId=C0A8000200002A9F0000000000003C9E, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=1], queueOffset=0]
Consumer console :
收到訊息: topic:TopicTransaction tags:transaction0 msg:hello 0