RocketMQ的開發使用
簡介
RocketMQ是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型,支援拉(pull)和推(push)兩種訊息模式等特性。
安裝與服務
1. 下載安裝包(如:rocketmq-all-4.3.1-bin-release.zip)
2. 解壓
>unzip rocketmq-all-4.3.1-bin-release.zip
3. 啟動nameserver
>cd rocketmq-all-4.3.1-bin-release
>nohup sh bin/mqnamesrv &
4. 啟動broker
>nohup sh bin/broker -n localhost:9876 &
5. 停止服務
>sh bin/mqshutdown broker
訊息生產者
生產者的作用就是將訊息傳送到 MQ,生產者本身既可以產生訊息,如讀取文字資訊等。也可以對外提供介面,由外部應用來呼叫介面,再由生產者將收到的訊息傳送到 MQ。
在編寫程式的時候需要注意以下幾個引數:
Producer Group : 生產者組就是多個傳送同一類訊息的生產者稱之為一個生產者組
Topic : 訊息邏輯分類,比如訂單類、庫存類的訊息
Name Server : 路由資訊,通訊的地址及埠號
Tag : 標籤是topic的進一步細化
Key : 訊息的唯一鍵值,業務上可以通過key查詢訊息
Message : 訊息載體,主要用於儲存實際資料
訊息消費者
消費 MQ 上的訊息的應用程式就是消費者,至於訊息是否進行邏輯處理,還是直接儲存到資料庫等取決於業務需要。
Consumer Group : 消費者組,和生產者類似,消費同一類訊息的多個 consumer 例項組成一個消費者組。
生產者、消費者Demo
maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
RocketMQProducerHelper.java
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RocketMQProducerHelper {
private static Logger LOG = Logger.getLogger(RocketMQProducerHelper.class);
private static String topic;
private static RocketMQProducerHelper instance = null;
private DefaultMQProducer producer;
private RocketMQProducerHelper(String nameServer, String topicName, String groupID) {
try {
if (nameServer != null && topicName != null && groupID != null) {
topic = topicName;
producer = new DefaultMQProducer(groupID);
producer.setRetryTimesWhenSendFailed(4);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.setNamesrvAddr(nameServer);
producer.start();
LOG.info("producer started...");
} else {
LOG.error("parameter init error");
throw new Exception("parameter init error");
}
} catch (Exception e) {
LOG.error("producer init error...");
throw new RuntimeException(e);
}
}
public static RocketMQProducerHelper getInstance(String nameServer, String topic, String groupID) {
if(instance == null) {
synchronized (RocketMQProducerHelper.class) {
if (instance == null) {
instance = new RocketMQProducerHelper(nameServer, topic, groupID);
}
}
}
return instance;
}
public SendResult send(byte[] data) {
return send(topic, null, null, data, null);
}
public SendResult send(String tag, byte[] data) {
return send(topic, tag, null, data, null);
}
public SendResult send(String tag, String key, byte[] data) {
return send(topic, tag, key, data, null);
}
public SendResult send(String topic, String tag, String key, byte[] data, final MessageQueueSelector selector) {
SendResult sendResult = null;
try {
Message msg;
if (tag == null || tag.length() == 0) {
msg = new Message(topic, data);
} else if (key == null || key.length() == 0) {
msg = new Message(topic, tag, data);
} else {
msg = new Message(topic, tag, key, data);
}
if (selector != null) {
sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return selector.select(mqs, msg, arg);
}
}, key);
} else {
sendResult = producer.send(msg);
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Send message error");
}
return sendResult;
}
}
1. 生產者Producer
public class Producer {
public static void main(String[] args) {
String nameServer = "localhost:9876";
String topic = "test";
String groupID = "groupid";
String tag = "TAG";
RocketMQProducerHelper producer = RocketMQProducerHelper.getInstance(nameServer, topic, groupID);
for (int i=0; i<10; i++) {
String key = "1000"+i;
String value = "value="+i;
producer.send(tag, key, value.getBytes());
}
}
}
2. 消費者Consumer
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
String nameServer = "localhost:9876";
String groupID = "groupid";
String topic = "test";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupID);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for(MessageExt msg : msgs) {
String result = Bytes.toString(msg.getBody());
System.out.println("--------------------msg="+result);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}