RocketMQ簡單使用(一)簡單訊息、順序訊息、延遲訊息
研究下其簡單使用。
1. 簡單訊息
這裡使用三種訊息的傳送方式: 同步傳送、非同步傳送、單向傳送,以及訊息的消費。
- 同步傳送
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 同步傳送訊息: 可靠同步傳輸應用場景廣泛,如重要通知訊息、簡訊通知、簡訊營銷系統等 */ public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("syncProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("syncTopic" /* Topic */, "TagA" /* Tag */, "keys" + i, /* Keys */ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
結果:
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224D30000, offsetMsgId=C0A80D6F00002A9F000000000010B9F4, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=0], queueOffset=50] SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E00001, offsetMsgId=C0A80D6F00002A9F000000000010BABD, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=1], queueOffset=50] SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E40002, offsetMsgId=C0A80D6F00002A9F000000000010BB86, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=2], queueOffset=50] SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E80003, offsetMsgId=C0A80D6F00002A9F000000000010BC4F, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=50] ...
- 非同步傳送訊息
package com.zd.bx.rocketmq.simple; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 非同步傳送訊息: 一般用於響應時間敏感的業務場景。 */ public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("asyncProducer"); // Specify name server addresses. producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("asyncTopic", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
結果:
0 OK 7F000001631C18B4AAC25BF3A67E0000
6 OK 7F000001631C18B4AAC25BF3A67E0002
3 OK 7F000001631C18B4AAC25BF3A67E0005
1 OK 7F000001631C18B4AAC25BF3A67E0007
4 OK 7F000001631C18B4AAC25BF3A67E0004
7 OK 7F000001631C18B4AAC25BF3A67E0001
...
- 單向傳送
package com.zd.bx.rocketmq.simple;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 單向傳送訊息: 單向傳輸用於需要中等可靠性的情況,例如日誌收集
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.13.111:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("oneWayTopic" /* Topic */,
"TagB" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
- 消費者
package com.zd.bx.rocketmq.simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 定義一個pull 消費者
// DefaultLitePullConsumer consumer2 = new DefaultLitePullConsumer("myTestConsumerGroup");
// 定義一個push消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
// 指定從第一條訊息開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定nameserver
consumer.setNamesrvAddr("192.168.13.111:9876");
// 指定消費的topic與tag
consumer.subscribe("syncTopic", "*");
// 指定使用 廣播模式進行消費,預設為叢集模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s body: %s %n", Thread.currentThread().getName(), msgs, new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2. 順序訊息
1. 什麼是順序訊息
順序訊息是指,嚴格按照訊息的傳送順序進行消費的訊息。
預設情況下,生產者會把以 RoundRobin 輪詢方式傳送到不同的Queue 分割槽佇列; 而消費訊息時會從多個Queue 上拉取訊息,這種情況下的傳送和消費是不能保證順序的。 如果將訊息僅傳送到同一個Queue 中,消費時也就從這個Queue 上拉取訊息,就保證了訊息的順序性。
舉個例子:訂單狀態佇列(ORDER_STATUS), 其下有四個Queue 佇列。我們傳送一個訂單的狀態時:001未支付-》001已支付-》001發貨中-》001發貨成功; 這四個狀態必須嚴格按照順序進行消費,所以就引入了順序訊息。
2. 有序性分類
全域性有序\分割槽有序。 全域性有序是指該topic只有一個佇列;分割槽有序是指在有多個Queue 的情況下,在定義Producer時我們指定訊息佇列選擇器,將相關的訊息傳送到相同的佇列,來保證在同一個佇列。
3. 程式碼演示
全域性有序很簡單,只需要將生產者佇列數量設定為1即可。
分割槽有序可以在調send 的時候傳遞MessageQueueSelector 進行佇列的負載均衡, 負載均衡肯定會選擇一個key 作為路由的值:可以是msg的ke, 也可以是呼叫send 的時候傳遞第三個引數。
package com.zd.bx.rocketmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderProducer");
producer.setNamesrvAddr("192.168.13.111:9876");
// 設定queue 的數量為1, 則為全域性有序
// producer.setDefaultTopicQueueNums(1);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setKeys(i + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 第一種方法是根據key 來進行路由
// Integer id = Integer.valueOf(msg.getKeys());
// 第二種就是傳遞引數來進行路由,send 方法的第三個引數會傳遞到arg引數上面
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
3. 延遲訊息
如果記得沒錯,rabbitMQ的延遲訊息需要藉助於訊息的生存時間和死信佇列實現延遲。
1. 延遲訊息的等級
延遲訊息的等級定義在服務端MessageStoreConfig.java 中,如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
比如,如果指定等級為3, 則延遲為10s, 延遲等級從1開始。如果想增加其他的配置,可以在rocketMQ安裝目錄的conf 目錄中進行配置。
2. 延遲訊息原理
如下圖:
具體實現方案是:
Producer 將訊息傳送到Broker 之後,Broker 先將訊息傳送到commitLog 檔案,然後將其分發到相應的consumerqueue。 不過,在分發之前,系統會判斷訊息中是否帶有延遲等級,沒有則直接正常分發;若有:
(1) 修改訊息的topic 為SCHEDULE_TOPIC_XXXX 目錄。
(2) 根據延時等級,在consumequeue 目錄中SCHEDULE_TOPIC_XXXX 主題下創建出相應的queueId 與consumequeue 檔案(如果沒有這些目錄與檔案)。這裡需要注意:延遲等級與queueId 的對應關係為 queueId = delayLevel -1
(3) 修改訊息索引單元內容。索引單元中的MessagetagHashCode 部分原本存放的是訊息的Tag的Hash 值。現在修改為訊息的投遞時間。投遞時間=訊息儲存時間+延遲時間, 也就是投遞時間存的是其真正需要被分發回原queue 的時間。
(4) 投遞延遲訊息:Broker 內部有一個延遲訊息服務類ScheduleMessageService,負責將訊息投遞到目標Topic(內部用Timer 定時器實現)
(5) 將訊息重新寫入commitlog,形成新的索引條目分發到相應的queue 中
3. 程式碼演示
package com.zd.bx.rocketmq.delay;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("delayProducer");
producer.setNamesrvAddr("192.168.13.111:9876");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("delayTopicTest", ("Hello delay message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}
如果想測試效果,可以先啟動消費者,然後啟動生產者檢視其效果。
啟動後到伺服器rocketMQ 儲存目錄檢視主題目錄其佇列資訊如下:(預設在當前使用者的store 目錄)
[root@redisnode1 consumequeue]# pwd
/root/store/consumequeue
[root@redisnode1 consumequeue]# ls
asyncTopic delayTopicTest oneWayTopic SCHEDULE_TOPIC_XXXX syncTopic TopicTest
[root@redisnode1 consumequeue]# ls -R ./delayTopicTest/
./delayTopicTest/:
0 1 2 3
./delayTopicTest/0:
00000000000000000000
./delayTopicTest/1:
00000000000000000000
./delayTopicTest/2:
00000000000000000000
./delayTopicTest/3:
00000000000000000000
[root@redisnode1 consumequeue]# ls -R ./SCHEDULE_TOPIC_XXXX/
./SCHEDULE_TOPIC_XXXX/:
2
./SCHEDULE_TOPIC_XXXX/2:
00000000000000000000
可以看出第一次發出延遲訊息後會多出一個SCHEDULE_TOPIC_XXXX目錄,內部會根據訊息的延遲等級建立相應的queue 目錄(queueId = delayLevel -1 , 所以queue 的目錄ID為2)。