RocketMQ API使用簡介、拉取機制
阿新 • • 發佈:2019-01-06
import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // 宣告並初始化一個producer // 需要一個producer group名字作為構造方法的引數,這裡為producer1 DefaultMQProducer producer = new DefaultMQProducer("order_producer"); producer.setNamesrvAddr("localhost:9876"); //producer.createTopic(key, newTopic, queueNum); producer.start(); //String[] tags = new String[] {"TagA","TagC","TagD"}; Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 1; i <= 5; i++) { try { // 時間戳 String body = dateStr + " order_0 " + i; Message msg = new Message("TopicTest",// topic //tags[i%tags.length],// tag "order_0",// tag "KEY"+i, body.getBytes()// body ); SendResult sendResult = producer.send(msg,new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer)arg; return mqs.get(id); } // 0是佇列的下標 },0); System.out.println(sendResult + ", body:" + body); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } for (int i = 1; i <= 5; i++) { try { // 時間戳 String body = dateStr + " order_1 " + i; Message msg = new Message("TopicTest",// topic //tags[i%tags.length],// tag "order_1",// tag "KEY"+i, body.getBytes()// body ); SendResult sendResult = producer.send(msg,new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer)arg; return mqs.get(id); } // 1是佇列的下標 },1); System.out.println(sendResult + ", body:" + body); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } for (int i = 1; i <= 5; i++) { try { // 時間戳 String body = dateStr + " order_2 " + i; Message msg = new Message("TopicTest",// topic //tags[i%tags.length],// tag "order_2",// tag "KEY"+i, body.getBytes()// body ); SendResult sendResult = producer.send(msg,new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer)arg; return mqs.get(id); } // 2是佇列的下標 },2); System.out.println(sendResult + ", body:" + body); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer1 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer"); consumer.setNamesrvAddr("localhost:9876"); /** * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費 * 如果非第一次啟動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /**消費執行緒池最小數量:預設10**/ consumer.setConsumeThreadMin(10); /**消費執行緒池最大數量:預設120**/ consumer.setConsumeThreadMax(20); // 訂閱的主題,以及過濾的標籤內容 consumer.subscribe("TopicTest", "*"); //設定一個Listener,主要進行訊息的邏輯處理 consumer.registerMessageListener(new MessageListenerOrderly() { private Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設定自動提交 context.setAutoCommit(true); for(MessageExt msg: msgs) { System.out.println(msg + ",content:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); //呼叫start()方法啟動consumer consumer.start(); System.out.println("C1 Started."); } }
import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer"); consumer.setNamesrvAddr("localhost:9876"); /** * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費 * 如果非第一次啟動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /**消費執行緒池最小數量:預設10**/ consumer.setConsumeThreadMin(10); /**消費執行緒池最大數量:預設120**/ consumer.setConsumeThreadMax(20); // 訂閱的主題,以及過濾的標籤內容 consumer.subscribe("TopicTest", "*"); //設定一個Listener,主要進行訊息的邏輯處理 consumer.registerMessageListener(new MessageListenerOrderly() { private Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設定自動提交 context.setAutoCommit(true); for(MessageExt msg: msgs) { System.out.println(msg + ",content:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); //呼叫start()方法啟動consumer consumer.start(); System.out.println("C2 Started."); } }