RocketMQ的順序消費和事務消費
一、三種消費 :1.普通消費 2. 順序消費 3.事務消費
1.1 順序消費:在網購的時候,我們需要下單,那麼下單需要假如有三個順序,第一、建立訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。RocketMQ可以保證順序消費,他的實現是生產者(一個生產者可以對多個主題去傳送訊息)將這個三個訊息放在topic(一個topic預設有4個佇列)的一個佇列裡面,單機支援上萬個持久化佇列,消費端去消費的時候也是隻能有一個Consumer去取得這個佇列裡面的資料,然後順序消費。
單個節點(Producer端1個、Consumer端1個)
Producer端
-
package order;
-
import java.util.List;
-
import com.alibaba.rocketmq.client.exception.MQBrokerException;
-
import com.alibaba.rocketmq.client.exception.MQClientException;
-
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-
import com.alibaba.rocketmq.client.producer.SendResult;
-
import com.alibaba.rocketmq.common.message.Message;
-
import com.alibaba.rocketmq.common.message.MessageQueue;
-
import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
/**
-
* Producer,傳送順序訊息
-
*/
-
public class Producer {
-
public static void main(String[] args) {
-
try {
-
DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
-
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
producer.start();
-
// String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
-
// "TagE" };
-
for (int i = 1; i <= 5; i++) {
-
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
-
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
-
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-
Integer id = (Integer) arg;
-
int index = id % mqs.size();
-
return mqs.get(index);
-
}
-
}, 0);
-
System.out.println(sendResult);
-
}
-
producer.shutdown();
-
} catch (MQClientException e) {
-
e.printStackTrace();
-
} catch (RemotingException e) {
-
e.printStackTrace();
-
} catch (MQBrokerException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
Consumer端程式碼
-
package order;
-
import java.util.List;
-
import java.util.concurrent.TimeUnit;
-
import java.util.concurrent.atomic.AtomicLong;
-
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-
import com.alibaba.rocketmq.client.exception.MQClientException;
-
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-
import com.alibaba.rocketmq.common.message.MessageExt;
-
/**
-
* 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)
-
*/
-
public class Consumer1 {
-
public static void main(String[] args) throws MQClientException {
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
-
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
/**
-
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
-
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
-
*/
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
consumer.subscribe("TopicOrderTest", "*");
-
consumer.registerMessageListener(new MessageListenerOrderly() {
-
AtomicLong consumeTimes = new AtomicLong(0);
-
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
-
// 設定自動提交
-
context.setAutoCommit(true);
-
for (MessageExt msg : msgs) {
-
System.out.println(msg + ",內容:" + new String(msg.getBody()));
-
}
-
try {
-
TimeUnit.SECONDS.sleep(5L);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
;
-
return ConsumeOrderlyStatus.SUCCESS;
-
}
-
});
-
consumer.start();
-
System.out.println("Consumer1 Started.");
-
}
-
}
結果如下圖所示:
這個五條資料被順序消費了
多個節點(Producer端1個、Consumer端2個)
Producer端程式碼:
-
package order;
-
import java.util.List;
-
import com.alibaba.rocketmq.client.exception.MQBrokerException;
-
import com.alibaba.rocketmq.client.exception.MQClientException;
-
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-
import com.alibaba.rocketmq.client.producer.SendResult;
-
import com.alibaba.rocketmq.common.message.Message;
-
import com.alibaba.rocketmq.common.message.MessageQueue;
-
import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
/**
-
* Producer,傳送順序訊息
-
*/
-
public class Producer {
-
public static void main(String[] args) {
-
try {
-
DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
-
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
producer.start();
-
// String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
-
// "TagE" };
-
for (int i = 1; i <= 5; i++) {
-
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
-
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
-
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-
Integer id = (Integer) arg;
-
int index = id % mqs.size();
-
return mqs.get(index);
-
}
-
}, 0);
-
System.out.println(sendResult);
-
}
-
for (int i = 1; i <= 5; i++) {
-
Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());
-
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
-
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-
Integer id = (Integer) arg;
-
int index = id % mqs.size();
-
return mqs.get(index);
-
}
-
}, 1);
-
System.out.println(sendResult);
-
}
-
for (int i = 1; i <= 5; i++) {
-
Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());
-
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
-
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-
Integer id = (Integer) arg;
-
int index = id % mqs.size();
-
return mqs.get(index);
-
}
-
}, 2);
-
System.out.println(sendResult);
-
}
-
producer.shutdown();
-
} catch (MQClientException e) {
-
e.printStackTrace();
-
} catch (RemotingException e) {
-
e.printStackTrace();
-
} catch (MQBrokerException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
Consumer1
-
/**
-
* 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)
-
*/
-
public class Consumer1 {
-
public static void main(String[] args) throws MQClientException {
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
-
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
/**
-
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
-
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
-
*/
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
consumer.subscribe("TopicOrderTest", "*");
-
/**
-
* 實現了MessageListenerOrderly表示一個佇列只會被一個執行緒取到
-
*,第二個執行緒無法訪問這個佇列
-
*/
-
consumer.registerMessageListener(new MessageListenerOrderly() {
-
AtomicLong consumeTimes = new AtomicLong(0);
-
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
-
// 設定自動提交
-
context.setAutoCommit(true);
-
for (MessageExt msg : msgs) {
-
System.out.println(msg + ",內容:" + new String(msg.getBody()));
-
}
-
try {
-
TimeUnit.SECONDS.sleep(5L);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
;
-
return ConsumeOrderlyStatus.SUCCESS;
-
}
-
});
-
consumer.start();
-
System.out.println("Consumer1 Started.");
-
}
-
}
Consumer2
-
/**
-
* 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)
-
*/
-
public class Consumer2 {
-
public static void main(String[] args) throws MQClientException {
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
-
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
/**
-
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
-
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
-
*/
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
consumer.subscribe("TopicOrderTest", "*");
-
/**
-
* 實現了MessageListenerOrderly表示一個佇列只會被一個執行緒取到
-
*,第二個執行緒無法訪問這個佇列
-
*/
-
consumer.registerMessageListener(new MessageListenerOrderly() {
-
AtomicLong consumeTimes = new AtomicLong(0);
-
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
-
// 設定自動提交
-
context.setAutoCommit(true);
-
for (MessageExt msg : msgs) {
-
System.out.println(msg + ",內容:" + new String(msg.getBody()));
-
}
-
try {
-
TimeUnit.SECONDS.sleep(5L);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
;
-
return ConsumeOrderlyStatus.SUCCESS;
-
}
-
});
-
consumer.start();
-
System.out.println("Consumer2 Started.");
-
}
-
}
先啟動Consumer1和Consumer2,然後啟動Producer,Producer會發送15條訊息 Consumer1消費情況如圖,都按照順序執行了
Consumer2消費情況如圖,都按照順序執行了
二、事務消費
這裡說的主要是分散式事物。下面的例子的資料庫分別安裝在不同的節點上。
事物消費需要先說說什麼是事物。比如說:我們跨行轉賬,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元之後,我的建設銀行也必須加1000元。這樣才能保證資料的一致性。假如工商銀行轉1000元之後,建設銀行的伺服器突然宕機,那麼我扣除了1000,但是並沒有在建設銀行給我加1000,就出現了資料的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。
再比如,我們進行網購的時候,我們下單之後,訂單提交成功,倉庫商品的數量必須減一。但是訂單可能是一個數據庫,倉庫數量可能又是在另個數據庫裡面。有可能訂單提交成功之後,倉庫數量伺服器突然宕機。這樣也出現了資料不一致的問題。
使用訊息佇列來解決分散式事物:
現在我們去外面飯店吃飯,很多時候都不會直接給了錢之後直接在付款的視窗遞飯菜,而是付款之後他會給你一張小票,你拿著這個小票去出飯的視窗取飯。這裡和我們的系統類似,提高了吞吐量。即使你到第二個視窗,師傅告訴你已經沒飯了,你可以拿著這個憑證去退款,即使中途由於出了意外你無法到達視窗進行取飯,但是隻要憑證還在,可以將錢退給你。這樣就保證了資料的一致性。
如何保證憑證(訊息)有2種方法:
1、在工商銀行扣款的時候,餘額表扣除1000,同時記錄日誌,而且這2個表是在同一個資料庫例項中,可以使用本地事物解決。然後我們通知建設銀行需要加1000給該使用者,建設銀行收到之後給我返回已經加了1000給使用者的確認資訊之後,我再標記日誌表裡面的日誌為已經完成。
2、通過訊息中介軟體
原文地址:http://www.jianshu.com/p/453c6e7ff81c
RocketMQ第一階段傳送Prepared訊息時,會拿到訊息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問訊息,並修改訊息的狀態。
細心的你可能又發現問題了,如果確認訊息傳送失敗了怎麼辦?RocketMQ會定期掃描訊息叢集中的事物訊息,如果發現了Prepared訊息,它會向訊息傳送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續傳送確認訊息呢?RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證了訊息傳送與本地事務同時成功或同時失敗。
例子:
Consumer 端
-
package transaction;
-
import java.util.List;
-
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-
import com.alibaba.rocketmq.client.exception.MQClientException;
-
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-
import com.alibaba.rocketmq.common.message.MessageExt;
-
/**
-
* Consumer,訂閱訊息
-
*/
-
public class Consumer {
-
public static void main(String[] args) throws InterruptedException, MQClientException {
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");
-
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
consumer.setConsumeMessageBatchMaxSize(10);
-
/**
-
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
-
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
-
*/
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
consumer.subscribe("TopicTransactionTest", "*");
-
consumer.registerMessageListener(new MessageListenerConcurrently() {
-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-
try {
-
for (MessageExt msg : msgs) {
-
System.out.println(msg + ",內容:" + new String(msg.getBody()));
-
}
-
} catch (Exception e) {
-
e.printStackTrace();
-
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
-
}
-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
-
}
-
});
-
consumer.start();
-
System.out.println("transaction_Consumer Started.");
-
}
-
}
Producer端
-
package transaction;
-
import com.alibaba.rocketmq.client.exception.MQClientException;
-
import com.alibaba.rocketmq.client.producer.SendResult;
-
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
-
import com.alibaba.rocketmq.common.message.Message;
-
/**
-
* 傳送事務訊息例子
-
*
-
*/
-
public class Producer {
-
public static void main(String[] args) throws MQClientException, InterruptedException {
-
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
-
TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");
-
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
-
// 事務回查最小併發數
-
producer.setCheckThreadPoolMinSize(2);
-
// 事務回查最大併發數
-
producer.setCheckThreadPoolMaxSize(2);
-
// 佇列數
-
producer.setCheckRequestHoldMax(2000);
-
producer.setTransactionCheckListener(transactionCheckListener);
-
producer.start();
-
// String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"
-
// };
-
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
-
for (int i = 1; i <= 2; i++) {
-
try {
-
Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,
-
("Hello RocketMQ " + i).getBytes());
-
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
-
System.out.println(sendResult);
-
Thread.sleep(10);
-
} catch (MQClientException e) {
-
e.printStackTrace();
-
}
-
}
-
for (int i = 0; i < 100000; i++) {
-
Thread.sleep(1000);
-
}
-
producer.shutdown();
-
}
-
}
TransactionExecuterImpl --執行本地事務
-
package transaction;
-
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
-
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
-
import com.alibaba.rocketmq.common.message.Message;
-
/**
-
* 執行本地事務
-
*/
-
public class TransactionExecuterImpl implements LocalTransactionExecuter {
-
// private AtomicInteger transactionIndex = new AtomicInteger(1);
-
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
-
System.out.println("執行本地事務msg = " + new String(msg.getBody()));
-
System.out.println("執行本地事務arg = " + arg);
-
String tags = msg.getTags();
-
if (tags.equals("transaction2")) {
-
System.out.println("======我的操作============,失敗了 -進行ROLLBACK");
-
return LocalTransactionState.ROLLBACK_MESSAGE;
-
}
-
return LocalTransactionState.COMMIT_MESSAGE;
-
// return LocalTransactionState.UNKNOW;
-
}
-
}
TransactionCheckListenerImpl--未決事務,伺服器回查客戶端(目前已經被閹割啦)
-
package transaction;
-
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
-
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-
import com.alibaba.rocketmq.common.message.MessageExt;
-
/**
-
* 未決事務,伺服器回查客戶端
-
*/
-
public class TransactionCheckListenerImpl implements TransactionCheckListener {
-
// private AtomicInteger transactionIndex = new AtomicInteger(0);
-
//在這裡,我們可以根據由MQ回傳的key去資料庫查詢,這條資料到底是成功了還是失敗了。
-
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
-
System.out.println("未決事務,伺服器回查客戶端msg =" + new String(msg.getBody().toString()));
-
// return LocalTransactionState.ROLLBACK_MESSAGE;
-
return LocalTransactionState.COMMIT_MESSAGE;
-
// return LocalTransactionState.UNKNOW;
-
}
-
}
producer端:傳送資料到MQ,並且處理本地事物。這裡模擬了一個成功一個失敗。Consumer只會接收到本地事物成功的資料。第二個資料失敗了,不會被消費。
Consumer只會接收到一個,第二個資料不會被接收到
--------------------- 本文來自 Mr_蝸牛 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/u010634288/article/details/57158374?utm_source=copy