RocketMq訊息 demo
阿新 • • 發佈:2018-11-20
參考 https://blog.csdn.net/asdf08442a/article/details/54882769 整理出來的測試 demo
1、produce 生產者
1 package com.bwdz.sp.comm.util.test; 2 3 import org.apache.rocketmq.client.exception.MQBrokerException; 4 import org.apache.rocketmq.client.exception.MQClientException; 5 import org.apache.rocketmq.client.producer.DefaultMQProducer;View Code6 import org.apache.rocketmq.client.producer.SendResult; 7 import org.apache.rocketmq.client.producer.SendStatus; 8 import org.apache.rocketmq.common.message.Message; 9 import org.apache.rocketmq.remoting.exception.RemotingException; 10 11 import java.util.UUID; 12 13 /** 14 * Created by xy on 2018/11/16.15 */ 16 public class SyncProducer { 17 private static DefaultMQProducer producer = null; 18 19 public static void main(String[] args) { 20 System.out.print("[----------]Start\n"); 21 int pro_count = 1; 22 if (args.length > 0) { 23 pro_count = Integer.parseInt(args[0]);24 } 25 boolean result = false; 26 try { 27 ProducerStart(); 28 for (int i = 1; i < pro_count; i++) { 29 String msg = "hello rocketmq "+ i+"".toString(); 30 SendMessage("qch_20170706", //topic 31 "Tag"+i, //tag 32 "Key"+i, //key 33 msg); //body 34 System.out.print(msg + "\n"); 35 } 36 }finally { 37 producer.shutdown(); 38 } 39 System.out.print("[----------]Succeed\n"); 40 } 41 42 private static boolean ProducerStart() { 43 producer = new DefaultMQProducer("pro_qch_test"); 44 producer.setNamesrvAddr("192.168.69.173:9876"); 45 producer.setInstanceName(UUID.randomUUID().toString()); 46 try { 47 producer.start(); 48 } catch(MQClientException e) { 49 e.printStackTrace(); 50 return false; 51 } 52 return true; 53 } 54 55 private static boolean SendMessage(String topic,String tag,String key, String str) { 56 Message msg = new Message(topic,tag,key,str.getBytes()); 57 try { 58 SendResult result = producer.send(msg); 59 SendStatus status = result.getSendStatus(); 60 System.out.println("___________________________SendMessage: "+status.name()); 61 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { 62 e.printStackTrace(); 63 return false; 64 } 65 return true; 66 } 67 }
2、consumer 消費者
1 package com.bwdz.sp.comm.util.test; 2 3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 6 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 7 import org.apache.rocketmq.common.consumer.ConsumeFromWhere; 8 import org.apache.rocketmq.common.message.MessageExt; 9 10 import java.util.List; 11 import java.util.UUID; 12 13 /** 14 * Created by xy on 2018/11/16. 15 */ 16 public class ConsumerTest { 17 public static void main(String[] args) { 18 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test"); 19 consumer.setInstanceName(UUID.randomUUID().toString()); 20 consumer.setConsumeMessageBatchMaxSize(32); 21 consumer.setNamesrvAddr("192.168.69.173:9876"); 22 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); 23 consumer.registerMessageListener(new MessageListenerConcurrently() { 24 @Override 25 public ConsumeConcurrentlyStatus consumeMessage( 26 List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { 27 for(MessageExt me : list) { 28 if("Tag1".equals(me.getTags())){ 29 System.out.println("處理 Tag1 業務"); 30 System.out.println(new String(me.getBody()) + "消費成功" + "\n"); 31 }else if("Tag2".equals(me.getTags())){ 32 System.out.println("處理 Tag2 業務"); 33 System.out.println(new String(me.getBody()) + "消費成功" + "\n"); 34 }else if("Tag3".equals(me.getTags())){ 35 System.out.println("處理 Tag3 業務"); 36 System.out.println(new String(me.getBody()) + "消費失敗" + "\n"); 37 return ConsumeConcurrentlyStatus.RECONSUME_LATER; 38 }else{ 39 //consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3"); 40 System.out.println("過濾掉的業務"+ me.getKeys()); 41 } 42 } 43 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 44 } 45 }); 46 try { 47 consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3"); 48 consumer.start(); 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 } 53 }View Code
先執行produce,控制檯輸出結果:
[----------]Start ___________________________SendMessage: SEND_OK hello rocketmq 1 ___________________________SendMessage: SEND_OK hello rocketmq 2 ___________________________SendMessage: SEND_OK hello rocketmq 3 ___________________________SendMessage: SEND_OK hello rocketmq 4 [----------]Succeed
再執行consumer,控制檯輸出結果:
注:訊息 ”hello rocketmq 4“ 被consumer裡47行程式碼過濾掉了,所以不會被消費;訊息 “hello rocket 3” 在消費的時候被指定失敗ConsumeConcurrentlyStatus.RECONSUME_LATER,表示消費失敗,如果被指定失敗,表明此訊息下次還可以繼續傳送到consumer被繼續消費處理,其他訊息則不會被再一次消費
處理 Tag2 業務
hello rocketmq 2消費成功
處理 Tag3 業務
hello rocketmq 3消費失敗
處理 Tag1 業務
hello rocketmq 1消費成功
consumer再次執行,控制檯輸出結果(直到被指定成功ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker服務才不會繼續傳送訊息):
處理 Tag3 業務
hello rocketmq 3消費失敗