1. 程式人生 > >RocketMq訊息 demo

RocketMq訊息 demo

參考 https://blog.csdn.net/asdf08442a/article/details/54882769 整理出來的測試 demo

1、produce 生產者

 1 package com.bwdz.sp.comm.util.test;
 2 
 3 import org.apache.rocketmq.client.exception.MQBrokerException;
 4 import org.apache.rocketmq.client.exception.MQClientException;
 5 import org.apache.rocketmq.client.producer.DefaultMQProducer;
6 import org.apache.rocketmq.client.producer.SendResult; 7 import org.apache.rocketmq.client.producer.SendStatus; 8 import org.apache.rocketmq.common.message.Message; 9 import org.apache.rocketmq.remoting.exception.RemotingException; 10 11 import java.util.UUID; 12 13 /** 14 * Created by xy on 2018/11/16.
15 */ 16 public class SyncProducer { 17 private static DefaultMQProducer producer = null; 18 19 public static void main(String[] args) { 20 System.out.print("[----------]Start\n"); 21 int pro_count = 1; 22 if (args.length > 0) { 23 pro_count = Integer.parseInt(args[0]);
24 } 25 boolean result = false; 26 try { 27 ProducerStart(); 28 for (int i = 1; i < pro_count; i++) { 29 String msg = "hello rocketmq "+ i+"".toString(); 30 SendMessage("qch_20170706", //topic 31 "Tag"+i, //tag 32 "Key"+i, //key 33 msg); //body 34 System.out.print(msg + "\n"); 35 } 36 }finally { 37 producer.shutdown(); 38 } 39 System.out.print("[----------]Succeed\n"); 40 } 41 42 private static boolean ProducerStart() { 43 producer = new DefaultMQProducer("pro_qch_test"); 44 producer.setNamesrvAddr("192.168.69.173:9876"); 45 producer.setInstanceName(UUID.randomUUID().toString()); 46 try { 47 producer.start(); 48 } catch(MQClientException e) { 49 e.printStackTrace(); 50 return false; 51 } 52 return true; 53 } 54 55 private static boolean SendMessage(String topic,String tag,String key, String str) { 56 Message msg = new Message(topic,tag,key,str.getBytes()); 57 try { 58 SendResult result = producer.send(msg); 59 SendStatus status = result.getSendStatus(); 60 System.out.println("___________________________SendMessage: "+status.name()); 61 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { 62 e.printStackTrace(); 63 return false; 64 } 65 return true; 66 } 67 }
View Code

2、consumer 消費者

 1 package com.bwdz.sp.comm.util.test;
 2 
 3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 6 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 7 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 8 import org.apache.rocketmq.common.message.MessageExt;
 9 
10 import java.util.List;
11 import java.util.UUID;
12 
13 /**
14  * Created by xy on 2018/11/16.
15  */
16 public class ConsumerTest {
17     public static void main(String[] args) {
18         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");
19         consumer.setInstanceName(UUID.randomUUID().toString());
20         consumer.setConsumeMessageBatchMaxSize(32);
21         consumer.setNamesrvAddr("192.168.69.173:9876");
22         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
23         consumer.registerMessageListener(new MessageListenerConcurrently() {
24             @Override
25             public ConsumeConcurrentlyStatus consumeMessage(
26                     List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
27                 for(MessageExt me : list) {
28                     if("Tag1".equals(me.getTags())){
29                         System.out.println("處理 Tag1 業務");
30                         System.out.println(new String(me.getBody()) + "消費成功" + "\n");
31                     }else if("Tag2".equals(me.getTags())){
32                         System.out.println("處理 Tag2 業務");
33                         System.out.println(new String(me.getBody()) + "消費成功" + "\n");
34                     }else if("Tag3".equals(me.getTags())){
35                         System.out.println("處理 Tag3 業務");
36                         System.out.println(new String(me.getBody()) + "消費失敗" + "\n");
37                         return ConsumeConcurrentlyStatus.RECONSUME_LATER;
38                     }else{
39                         //consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");
40                         System.out.println("過濾掉的業務"+ me.getKeys());
41                     }
42                 }
43                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
44             }
45         });
46         try {
47             consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");
48             consumer.start();
49         } catch (Exception e) {
50             e.printStackTrace();
51         }
52     }
53 }
View Code

 

先執行produce,控制檯輸出結果:

[----------]Start
___________________________SendMessage: SEND_OK
hello rocketmq 1
___________________________SendMessage: SEND_OK
hello rocketmq 2
___________________________SendMessage: SEND_OK
hello rocketmq 3
___________________________SendMessage: SEND_OK
hello rocketmq 4
[----------]Succeed

再執行consumer,控制檯輸出結果:

注:訊息 ”hello rocketmq 4“ 被consumer裡47行程式碼過濾掉了,所以不會被消費;訊息 “hello rocket 3” 在消費的時候被指定失敗ConsumeConcurrentlyStatus.RECONSUME_LATER,表示消費失敗,如果被指定失敗,表明此訊息下次還可以繼續傳送到consumer被繼續消費處理,其他訊息則不會被再一次消費

處理 Tag2 業務
hello rocketmq 2消費成功

處理 Tag3 業務
hello rocketmq 3消費失敗

處理 Tag1 業務
hello rocketmq 1消費成功

consumer再次執行,控制檯輸出結果(直到被指定成功ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker服務才不會繼續傳送訊息):

處理 Tag3 業務
hello rocketmq 3消費失敗