RocketMQ——順序消費(程式碼)
本部落格主要是以程式碼示例來了解順序消費的相關內容,建議在此之前先了解下順序消費的原理。
注:RocketMQ可以嚴格的保證訊息有序,但這個順序,不是全域性順序,只是分割槽(queue)順序,如果想要全域性順序,那麼需要保證只有一個分割槽。
順序消費簡介
1.普通順序消費
順序消費的一種,正常情況下可以保證完全的順序訊息,但是一旦發生通訊異常,Broker重啟,由於佇列總數法還是能變化,雜湊取模後定位的佇列會變化,產生短暫的訊息順序不一致。
2.嚴格順序訊息
順序訊息的一種,無論正常異常情況都能保證順序,但是犧牲了分散式failover特性,即broker叢集中要有一臺機器不可用,則整個叢集都不可用,服務可用性大大降低。如果伺服器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用。
目前已知的應用只有資料庫binlog同步強依賴嚴格順序訊息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序訊息。
1.producer
此處需要注意,producer.send(msg, new MessageQueueSelector()),如果需要全域性有序,只需要使new MessageQueueSelector().select(List<MessageQueue> mqs, Message msg, Object arg)方法返回值唯一且不變,例如:package com.gwd.rocketmq; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * @FileName Producer.java * @Description: * @author gu.weidong * @version V1.0 * @createtime 2018年7月3日 上午9:59:38 * 修改歷史: * 時間 作者 版本 描述 *==================================================== * */ /** * Producer,傳送順序訊息 */ public class Producer { public static void main(String[] args) throws IOException { try { DefaultMQProducer producer = new DefaultMQProducer("sequence_producer"); producer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876"); producer.start(); String[] tags = new String[] { "TagA", "TagC", "TagD" }; // 訂單列表 List<OrderDO> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加個時間字尾 String body = dateStr + " Hello RocketMQ " + orderList.get(i).getOrderId()+orderList.get(i).getDesc(); Message msg = new Message("SequenceTopicTest", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = Long.valueOf((String)arg); long index = id % mqs.size(); return mqs.get((int)index); } }, orderList.get(i).getOrderId());//通過訂單id來獲取對應的messagequeue System.out.println(sendResult + ", body:" + body); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.in.read(); } /** * 生成模擬訂單資料 */ private List<OrderDO> buildOrders() { List<OrderDO> orderList = new ArrayList<OrderDO>(); OrderDO OrderDO = new OrderDO(); OrderDO.setOrderId("15103111039"); OrderDO.setDesc("建立"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103111065"); OrderDO.setDesc("建立"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103111039"); OrderDO.setDesc("付款"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103117235"); OrderDO.setDesc("建立"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103111065"); OrderDO.setDesc("付款"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103117235"); OrderDO.setDesc("付款"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103111065"); OrderDO.setDesc("完成"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103111039"); OrderDO.setDesc("推送"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103117235"); OrderDO.setDesc("完成"); orderList.add(OrderDO); OrderDO = new OrderDO(); OrderDO.setOrderId("15103111039"); OrderDO.setDesc("完成"); orderList.add(OrderDO); return orderList; } }
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = Long.valueOf((String)arg); long index = id % mqs.size(); return mqs.get((int)index); } }, orderList.get(0).getOrderId());//通過訂單id來獲取對應的messagequeue
這邊獲取到的queue永遠都是唯一的且確定的(此處只是舉個簡單的例子,orderList.get(i).getOrderId()改為0亦可)
2.錯誤的Consumer
package com.gwd.rocketmq;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* @FileName WrongConsumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年7月3日 下午3:13:16
* 修改歷史:
* 時間 作者 版本 描述
*====================================================
*
*/
public class WrongConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
/**
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SequenceTopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
Random random = new Random();
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模擬業務邏輯處理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
注意:要想要有順序,那麼這邊吃監聽器就不能是MessageListenerConcurrently了,其顯示效果如下:
3.正確的Consumer
package com.gwd.rocketmq;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* @FileName Consumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年7月3日 上午10:05:26
* 修改歷史:
* 時間 作者 版本 描述
*====================================================
*
*/
/**
* 順序訊息消費,帶事務方式(應用可控制Offset什麼時候提交)
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
/**
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SequenceTopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模擬業務邏輯處理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
這邊的Consumer和上面的最明顯的區別在於對應的監聽器是MessageListenerOrderly,MessageListenerOrderly是能夠保證順序消費的。
顯示結果:
4.多個消費者
那如果有多個消費者呢?因為訊息傳送時被分配到多個佇列,這些佇列又會被分別傳送給消費者唯一消費,現在啟動兩個消費者,其消費情況如下圖:
結論:多個消費者時,各個消費者的訊息依舊是順序消費,且不會重複消費
原文轉自:https://blog.csdn.net/earthhour/article/details/78323026 ,在此基礎上部分程式碼略作修改