Rocketmq-client 傳送簡單訊息、順序訊息、延遲訊息以及消費者簡單例子
1. 簡單訊息
這裡使用三種訊息的傳送方式: 同步傳送、非同步傳送、單向傳送,以及訊息的消費。
- 同步傳送
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/** * 同步傳送訊息: 可靠同步傳輸應用場景廣泛,如重要通知訊息、簡訊通知、簡訊營銷系統等 */ public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("syncProducer"); // Specify name server addresses.producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("syncTopic" /* Topic */,"TagA" /* Tag */, "keys" + i, /* Keys */ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
結果:
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224D30000, offsetMsgId=C0A80D6F00002A9F000000000010B9F4, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=0], queueOffset=50] SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E00001, offsetMsgId=C0A80D6F00002A9F000000000010BABD, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=1], queueOffset=50] SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E40002, offsetMsgId=C0A80D6F00002A9F000000000010BB86, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=2], queueOffset=50] SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E80003, offsetMsgId=C0A80D6F00002A9F000000000010BC4F, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=50] ...
- 非同步傳送訊息
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 非同步傳送訊息: 一般用於響應時間敏感的業務場景。 */ public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("asyncProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("asyncTopic", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
結果:
0 OK 7F000001631C18B4AAC25BF3A67E0000 6 OK 7F000001631C18B4AAC25BF3A67E0002 3 OK 7F000001631C18B4AAC25BF3A67E0005 1 OK 7F000001631C18B4AAC25BF3A67E0007 4 OK 7F000001631C18B4AAC25BF3A67E0004 7 OK 7F000001631C18B4AAC25BF3A67E0001 ...
- 單向傳送
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 單向傳送訊息: 單向傳輸用於需要中等可靠性的情況,例如日誌收集 */ public class OnewayProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("oneWayTopic" /* Topic */, "TagB" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); } }
- 消費者
(1)推送模式的訊息消費者
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 定義一個push消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup"); // 指定從第一條訊息開始消費 // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定nameserver consumer.setNamesrvAddr("192.168.13.111:9876"); // 指定消費的topic與tag consumer.subscribe("syncTopic", "*"); // 指定使用 廣播模式進行消費,預設為叢集模式 // consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("%s Receive New Messages: %s body: %s %n", Thread.currentThread().getName(), msgs, new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
(2)拉取模式的消費者程式碼
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class PullConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 1、建立DefaultLitePullConsumer物件 DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer"); // 2、設定namesrv地址 litePullConsumer.setNamesrvAddr("192.168.13.111:9876"); // 3、訂閱消費主題 litePullConsumer.subscribe("syncTopic", "*"); // 4、啟動消費物件 litePullConsumer.start(); // 可以修改預設的阻塞時間 // litePullConsumer.setPollTimeoutMillis(0); try { // 5、迴圈開始消費訊息 while (true) { // 拉取訊息,無訊息時會阻塞 (預設會阻塞5s, 沒有訊息則返回一個空集合) List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s messageExts.size(): %s %n",System.currentTimeMillis(), messageExts.size()); messageExts.stream().forEach(msg -> { // 業務邏輯 System.out.printf("%s Receive New Messages, body: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); }); // 同步消費位置。不執行該方法,應用重啟會存在重複消費。 if (messageExts.size() > 0) { litePullConsumer.commitSync(); } } } finally { litePullConsumer.shutdown(); } } }
(3)關於訊息消費模式廣播模式與叢集模式的區別
廣播消費模式下:相同ConsumerGroup 的每個Consumer例項都接收同一個Topic 的全量訊息,即每條訊息都會被髮送到ConsumerGroup 中的每個每個Consumer。消費進度儲存在Consumer端,因為廣播模式每個Consumer 都會消費,但是其進度不同。
叢集消費模式下(預設,又被稱為負載均衡模式),相同ConsumerGroup 的每個Consumer 例項平攤同一個Topic 的訊息,即訊息只會被髮送到ConsumerGroup的某個Consumer。消費進度儲存在broker 中。因為同一條訊息只被消費一次,所以需要服務端broker 進行記錄下次進行合理的投遞。
簡單理解。對於多例項的Consumer(同時啟動多個相同的Consumer),叢集模式會平攤Topic的訊息;廣播模式,每個例項都會消費所有的訊息的訊息,出現重複消費。 由於上述的特性,消費進度儲存也略有不同,叢集模式下消費進度儲存在broker中;廣播模式儲存在consumer例項中。服務端可以檢視叢集模式的消費進度:
[root@redisnode1 config]# pwd /root/store/config [root@redisnode1 config]# cat consumerOffset.json { "offsetTable":{ "syncTopic@myTestConsumerGroup":{0:124,1:128,2:124,3:124 }, "%RETRY%myTestConsumerGroup@myTestConsumerGroup":{0:0 }, "syncTopic@LitePullConsumer":{0:115,1:118,2:117,3:115 } } }
2. 順序訊息
1. 什麼是順序訊息
順序訊息是指,嚴格按照訊息的傳送順序進行消費的訊息。
預設情況下,生產者會把以 RoundRobin 輪詢方式傳送到不同的Queue 分割槽佇列; 而消費訊息時會從多個Queue 上拉取訊息,這種情況下的傳送和消費是不能保證順序的。 如果將訊息僅傳送到同一個Queue 中,消費時也就從這個Queue 上拉取訊息,就保證了訊息的順序性。
舉個例子:訂單狀態佇列(ORDER_STATUS), 其下有四個Queue 佇列。我們傳送一個訂單的狀態時:001未支付-》001已支付-》001發貨中-》001發貨成功; 這四個狀態必須嚴格按照順序進行消費,所以就引入了順序訊息。
2. 有序性分類
全域性有序\分割槽有序。 全域性有序是指該topic只有一個佇列;分割槽有序是指在有多個Queue 的情況下,在定義Producer時我們指定訊息佇列選擇器,將相關的訊息傳送到相同的佇列,來保證在同一個佇列。
3. 程式碼演示
全域性有序很簡單,只需要將生產者佇列數量設定為1即可。
分割槽有序可以在調send 的時候傳遞MessageQueueSelector 進行佇列的負載均衡, 負載均衡肯定會選擇一個key 作為路由的值:可以是msg的key, 也可以是呼叫send 的時候傳遞第三個引數。
package com.zd.bx.rocketmq.order; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("orderProducer"); producer.setNamesrvAddr("192.168.13.111:9876"); // 設定queue 的數量為1, 則為全域性有序 // producer.setDefaultTopicQueueNums(1); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.setKeys(i + ""); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 第一種方法是根據key 來進行路由 // Integer id = Integer.valueOf(msg.getKeys()); // 第二種就是傳遞引數來進行路由,send 方法的第三個引數會傳遞到arg引數上面 Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
3. 延遲訊息
如果記得沒錯,rabbitMQ的延遲訊息需要藉助於訊息的生存時間和死信佇列實現延遲。
1. 延遲訊息的等級
延遲訊息的等級定義在服務端MessageStoreConfig.java 中,如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
比如,如果指定等級為3, 則延遲為10s, 延遲等級從1開始。如果想增加其他的配置,可以在rocketMQ安裝目錄的conf 目錄中進行配置。
2. 延遲訊息原理
如下圖:
具體實現方案是:
Producer 將訊息傳送到Broker 之後,Broker 先將訊息傳送到commitLog 檔案,然後將其分發到相應的consumerqueue。 不過,在分發之前,系統會判斷訊息中是否帶有延遲等級,沒有則直接正常分發;若有:
(1) 修改訊息的topic 為SCHEDULE_TOPIC_XXXX 目錄。
(2) 根據延時等級,在consumequeue 目錄中SCHEDULE_TOPIC_XXXX 主題下創建出相應的queueId 與consumequeue 檔案(如果沒有這些目錄與檔案)。這裡需要注意:延遲等級與queueId 的對應關係為 queueId = delayLevel -1
(3) 修改訊息索引單元內容。索引單元中的MessagetagHashCode 部分原本存放的是訊息的Tag的Hash 值。現在修改為訊息的投遞時間。投遞時間=訊息儲存時間+延遲時間, 也就是投遞時間存的是其真正需要被分發回原queue 的時間。
(4) 投遞延遲訊息:Broker 內部有一個延遲訊息服務類ScheduleMessageService,負責將訊息投遞到目標Topic(內部用Timer 定時器實現)
(5) 將訊息重新寫入commitlog,形成新的索引條目分發到相應的queue 中
3. 程式碼演示
package com.zd.bx.rocketmq.delay; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("delayProducer"); producer.setNamesrvAddr("192.168.13.111:9876"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("delayTopicTest", ("Hello delay message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); producer.send(message); } producer.shutdown(); } }
如果想測試效果,可以先啟動消費者,然後啟動生產者檢視其效果。
啟動後到伺服器rocketMQ 儲存目錄檢視主題目錄其佇列資訊如下:(預設在當前使用者的store 目錄)
[root@redisnode1 consumequeue]# pwd /root/store/consumequeue [root@redisnode1 consumequeue]# ls asyncTopic delayTopicTest oneWayTopic SCHEDULE_TOPIC_XXXX syncTopic TopicTest [root@redisnode1 consumequeue]# ls -R ./delayTopicTest/ ./delayTopicTest/: 0 1 2 3 ./delayTopicTest/0: 00000000000000000000 ./delayTopicTest/1: 00000000000000000000 ./delayTopicTest/2: 00000000000000000000 ./delayTopicTest/3: 00000000000000000000 [root@redisnode1 consumequeue]# ls -R ./SCHEDULE_TOPIC_XXXX/ ./SCHEDULE_TOPIC_XXXX/: 2 ./SCHEDULE_TOPIC_XXXX/2: 00000000000000000000
可以看出第一次發出延遲訊息後會多出一個SCHEDULE_TOPIC_XXXX目錄,內部會根據訊息的延遲等級建立相應的queue 目錄(queueId = delayLevel -1 , 所以queue 的目錄ID為2)。
轉載:https://www.cnblogs.com/qlqwjy/p/16023818.html
springboot整合,使用rocketmq-spring傳送訊息可以參考github官網:https://github.com/apache/rocketmq-spring,檢視如下包中的例子即可學會使用生產者和消費者:
rocketmq-spring 使用指南:https://github.com/apache/rocketmq-spring/wiki
TRANSLATE with x English TRANSLATE with EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back 此頁面的語言為英語 翻譯為
- 中文(簡體)
- 中文(繁體)
- 丹麥語
- 烏克蘭語
- 烏爾都語
- 亞美尼亞語
- 俄語
- 保加利亞語
- 克羅埃西亞語
- 冰島語
- 加泰羅尼亞語
- 匈牙利語
- 卡納達語
- 印地語
- 印尼語
- 古吉拉特語
- 哈薩克語
- 土耳其語
- 威爾士語
- 孟加拉語
- 尼泊爾語
- 布林語(南非荷蘭語)
- 希伯來語
- 希臘語
- 庫爾德語
- 德語
- 義大利語
- 拉脫維亞語
- 挪威語
- 捷克語
- 斯洛伐克語
- 斯洛維尼亞語
- 旁遮普語
- 日語
- 普什圖語
- 毛利語
- 法語
- 波蘭語
- 波斯語
- 泰盧固語
- 泰米爾語
- 泰語
- 海地克里奧爾語
- 愛沙尼亞語
- 瑞典語
- 立陶宛語
- 緬甸語
- 羅馬尼亞語
- 寮國語
- 芬蘭語
- 英語
- 荷蘭語
- 薩摩亞語
- 葡萄牙語
- 西班牙語
- 越南語
- 亞塞拜然語
- 阿姆哈拉語
- 阿爾巴尼亞語
- 阿拉伯語
- 韓語
- 馬爾加什語
- 馬拉地語
- 馬拉雅拉姆語
- 馬來語
- 馬耳他語
- 高棉語