1. 程式人生 > >RocketMQ的開發使用

RocketMQ的開發使用

簡介

       RocketMQ是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型,支援拉(pull)和推(push)兩種訊息模式等特性。

安裝與服務

1. 下載安裝包(如:rocketmq-all-4.3.1-bin-release.zip)

2. 解壓

     >unzip rocketmq-all-4.3.1-bin-release.zip 

3. 啟動nameserver

     >cd rocketmq-all-4.3.1-bin-release

     >nohup sh bin/mqnamesrv &

4. 啟動broker

     >nohup sh bin/broker -n localhost:9876 &

5. 停止服務

    >sh bin/mqshutdown broker

訊息生產者

       生產者的作用就是將訊息傳送到 MQ,生產者本身既可以產生訊息,如讀取文字資訊等。也可以對外提供介面,由外部應用來呼叫介面,再由生產者將收到的訊息傳送到 MQ。

在編寫程式的時候需要注意以下幾個引數:

Producer Group : 生產者組就是多個傳送同一類訊息的生產者稱之為一個生產者組

Topic : 訊息邏輯分類,比如訂單類、庫存類的訊息

Name Server :  路由資訊,通訊的地址及埠號

Tag : 標籤是topic的進一步細化

Key : 訊息的唯一鍵值,業務上可以通過key查詢訊息

Message : 訊息載體,主要用於儲存實際資料

訊息消費者

    消費 MQ 上的訊息的應用程式就是消費者,至於訊息是否進行邏輯處理,還是直接儲存到資料庫等取決於業務需要。

Consumer Group : 消費者組,和生產者類似,消費同一類訊息的多個 consumer 例項組成一個消費者組。

生產者、消費者Demo

maven

<dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.3.0</version>
</dependency>

RocketMQProducerHelper.java

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class RocketMQProducerHelper {

    private static Logger LOG = Logger.getLogger(RocketMQProducerHelper.class);

    private static String topic;
    private static RocketMQProducerHelper instance = null;
    private DefaultMQProducer producer;

    private RocketMQProducerHelper(String nameServer, String topicName, String groupID) {
        try {
            if (nameServer != null && topicName != null && groupID != null) {
                topic = topicName;
                producer = new DefaultMQProducer(groupID);
                producer.setRetryTimesWhenSendFailed(4);
                producer.setRetryAnotherBrokerWhenNotStoreOK(true);
                producer.setNamesrvAddr(nameServer);
                producer.start();
                LOG.info("producer started...");
            } else {
                LOG.error("parameter init error");
                throw new Exception("parameter init error");
            }
        } catch (Exception e) {
            LOG.error("producer init error...");
            throw new RuntimeException(e);
        }
    }

    public static RocketMQProducerHelper getInstance(String nameServer, String topic, String groupID) {
        if(instance == null) {
            synchronized (RocketMQProducerHelper.class) {
                if (instance == null) {
                    instance = new RocketMQProducerHelper(nameServer, topic, groupID);
                }
            }
        }
        return instance;
    }

    public SendResult send(byte[] data) {
        return send(topic, null, null, data, null);
    }

    public SendResult send(String tag, byte[] data) {
        return send(topic, tag, null, data, null);
    }

    public SendResult send(String tag, String key, byte[] data) {
        return send(topic, tag, key, data, null);
    }

    public SendResult send(String topic, String tag, String key, byte[] data, final MessageQueueSelector selector) {
        SendResult sendResult = null;
        try {
            Message msg;
            if (tag == null || tag.length() == 0) {
                msg = new Message(topic, data);
            } else if (key == null || key.length() == 0) {
                msg = new Message(topic, tag, data);
            } else {
                msg = new Message(topic, tag, key, data);
            }
            if (selector != null) {
                sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        return selector.select(mqs, msg, arg);
                    }
                }, key);
            } else {
                sendResult = producer.send(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error("Send message error");
        }
        return sendResult;
    }
}

1.  生產者Producer

public class Producer {
    public static void main(String[] args) {

        String nameServer = "localhost:9876";
        String topic = "test";
        String groupID = "groupid";
        String tag = "TAG";
        RocketMQProducerHelper producer = RocketMQProducerHelper.getInstance(nameServer, topic, groupID);
        for (int i=0; i<10; i++) {
            String key = "1000"+i;
            String value = "value="+i;
            producer.send(tag, key, value.getBytes());
        }
    }
}

2. 消費者Consumer

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws MQClientException {

        String nameServer = "localhost:9876";
        String groupID = "groupid";
        String topic = "test";

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupID);
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe(topic, "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for(MessageExt msg : msgs) {
                    String result = Bytes.toString(msg.getBody());
                    System.out.println("--------------------msg="+result);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}