1. 程式人生 > >RocketMQ API使用簡介、拉取機制

RocketMQ API使用簡介、拉取機制

分散式開放訊息系統(RocketMQ)的原理與實踐 

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // 宣告並初始化一個producer
        // 需要一個producer group名字作為構造方法的引數,這裡為producer1
        DefaultMQProducer producer = new DefaultMQProducer("order_producer");
        producer.setNamesrvAddr("localhost:9876");
        
        //producer.createTopic(key, newTopic, queueNum);
        
        producer.start();

        //String[] tags = new String[] {"TagA","TagC","TagD"};
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 1; i <= 5; i++) {
            try {
            	// 時間戳
            	String body = dateStr + " order_0 " + i;
                Message msg = new Message("TopicTest",// topic
                    //tags[i%tags.length],// tag
                	"order_0",// tag
                    "KEY"+i,
                    body.getBytes()// body
                        );
                SendResult sendResult = producer.send(msg,new MessageQueueSelector() {
					
					@Override
					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
						Integer id = (Integer)arg;						
						return mqs.get(id);
					}
			    // 0是佇列的下標
				},0);
                System.out.println(sendResult + ", body:" + body);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        
        for (int i = 1; i <= 5; i++) {
            try {
            	// 時間戳
            	String body = dateStr + " order_1 " + i;
                Message msg = new Message("TopicTest",// topic
                    //tags[i%tags.length],// tag
                	"order_1",// tag
                    "KEY"+i,
                    body.getBytes()// body
                        );
                SendResult sendResult = producer.send(msg,new MessageQueueSelector() {
					
					@Override
					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
						Integer id = (Integer)arg;						
						return mqs.get(id);
					}
			    // 1是佇列的下標
				},1);
                System.out.println(sendResult + ", body:" + body);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        
        for (int i = 1; i <= 5; i++) {
            try {
            	// 時間戳
            	String body = dateStr + " order_2 " + i;
                Message msg = new Message("TopicTest",// topic
                    //tags[i%tags.length],// tag
                	"order_2",// tag
                    "KEY"+i,
                    body.getBytes()// body
                        );
                SendResult sendResult = producer.send(msg,new MessageQueueSelector() {
					
					@Override
					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
						Integer id = (Integer)arg;						
						return mqs.get(id);
					}
			    // 2是佇列的下標
				},2);
                System.out.println(sendResult + ", body:" + body);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer1 {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");
        consumer.setNamesrvAddr("localhost:9876");

        /**
         * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
         * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /**消費執行緒池最小數量:預設10**/
        consumer.setConsumeThreadMin(10);
        /**消費執行緒池最大數量:預設120**/
        consumer.setConsumeThreadMax(20);
        // 訂閱的主題,以及過濾的標籤內容
        consumer.subscribe("TopicTest", "*");

        //設定一個Listener,主要進行訊息的邏輯處理
        consumer.registerMessageListener(new MessageListenerOrderly() {
        	
        	private Random random = new Random();
			
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				
				// 設定自動提交
				context.setAutoCommit(true);
				for(MessageExt msg: msgs) {
					System.out.println(msg + ",content:" + new String(msg.getBody()));
				}
				
				try {
					TimeUnit.SECONDS.sleep(random.nextInt(5));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
				
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
        //呼叫start()方法啟動consumer
        consumer.start();
        System.out.println("C1 Started.");
    }
}
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer2 {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");
        consumer.setNamesrvAddr("localhost:9876");

        /**
         * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
         * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /**消費執行緒池最小數量:預設10**/
        consumer.setConsumeThreadMin(10);
        /**消費執行緒池最大數量:預設120**/
        consumer.setConsumeThreadMax(20);
        // 訂閱的主題,以及過濾的標籤內容
        consumer.subscribe("TopicTest", "*");

        //設定一個Listener,主要進行訊息的邏輯處理
        consumer.registerMessageListener(new MessageListenerOrderly() {
        	
        	private Random random = new Random();
			
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				
				// 設定自動提交
				context.setAutoCommit(true);
				for(MessageExt msg: msgs) {
					System.out.println(msg + ",content:" + new String(msg.getBody()));
				}
				
				try {
					TimeUnit.SECONDS.sleep(random.nextInt(5));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
				
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
        //呼叫start()方法啟動consumer
        consumer.start();
        System.out.println("C2 Started.");
    }
}