6-rocketmq-springboot整合
官方手冊
https://github.com/apache/rocketmq-spring/wiki/使用者手冊
引包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
修改application.properties
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group
注意:
請將上述示例配置中的
127.0.0.1:9876
替換成真實RocketMQ的NameServer地址與埠
編寫程式碼
@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... args) throws Exception { //send message synchronously rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!"); //send spring message rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); //send messgae asynchronously rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() { @Override public void onSuccess(SendResult var1) { System.out.printf("async onSucess SendResult=%s %n", var1); } @Override public void onException(Throwable var1) { System.out.printf("async onException Throwable=%s %n", var1); } }); //Send messages orderly rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey") //rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate } @Data @AllArgsConstructor public class OrderPaidEvent implements Serializable{ private String orderId; private BigDecimal paidMoney; } }
接收訊息
rongtong edited this page on 25 Dec 2019 · 1 revision
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
注意:
請將上述示例配置中的
127.0.0.1:9876
替換成真實RocketMQ的NameServer地址與埠
編寫程式碼
@SpringBootApplication public class ConsumerApplication{ public static void main(String[] args){ SpringApplication.run(ConsumerApplication.class, args); } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>{ public void onMessage(String message) { log.info("received message: {}", message); } } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{ public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } }
事務訊息
rongtong edited this page on 25 May · 2 revisions
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
請將上述示例配置中的
127.0.0.1:9876
替換成真實RocketMQ的NameServer地址與埠
編寫程式碼
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
try {
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
} catch (MQClientException e) {
e.printStackTrace(System.out);
}
}
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
}
訊息軌跡
rongtong edited this page on 25 Dec 2019 · 1 revision
Producer 端要想使用訊息軌跡,需要多配置兩個配置項:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端訊息軌跡的功能需要在 @RocketMQMessageListener
中進行配置對應的屬性:
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
enableMsgTrace = true,
customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
注意:
預設情況下 Producer 和 Consumer 的訊息軌跡功能是開啟的且 trace-topic 為 RMQ_SYS_TRACE_TOPIC Consumer 端的訊息軌跡 trace-topic 可以在配置檔案中配置
rocketmq.consumer.customized-trace-topic
配置項,不需要為在每個@RocketMQMessageListener
配置。
ACL功能
rongtong edited this page on 25 Dec 2019 · 1 revision
Producer 端要想使用 ACL 功能,需要多配置兩個配置項:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener
中進行配置
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
accessKey = "AK",
secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
注意:
可以不用為每個
@RocketMQMessageListener
註解配置 AK/SK,在配置檔案中配置rocketmq.consumer.access-key
和rocketmq.consumer.secret-key
配置項,這兩個配置項的值就是預設值
請求 應答語義支援
rongtong edited this page on 21 Feb · 2 revisions
RocketMQ-Spring 提供 請求/應答 語義支援。
- Producer端
傳送Request訊息使用SendAndReceive方法
注意
同步傳送需要在方法的引數中指明返回值型別
非同步傳送需要在回撥的介面中指明返回值型別
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
// 同步傳送request並且等待String型別的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);
// 非同步傳送request並且等待User型別的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
System.out.printf("send user object and receive %s %n", message.toString());
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
}
@Data
@AllArgsConstructor
public class User implements Serializable{
private String userName;
private Byte userAge;
}
}
- Consumer端
需要實現RocketMQReplyListener<T, R> 介面,其中T表示接收值的型別,R表示返回值的型別。
@SpringBootApplication
public class ConsumerApplication{
public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args);
}
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
@Service
@RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
public void onMessage(User user) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
User replyUser = new User("replyUserName",(byte) 10);
return replyUser;
}
}
@Data
@AllArgsConstructor
public class User implements Serializable{
private String userName;
private Byte userAge;
}
}
常見問題
rongtong edited this page on 25 Dec 2019 · 1 revision
-
生產環境有多個
nameserver
該如何連線?rocketmq.name-server
支援配置多個nameserver
地址,採用;
分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876
-
rocketMQTemplate
在什麼時候被銷燬?開發者在專案中使用
rocketMQTemplate
傳送訊息時,不需要手動執行rocketMQTemplate.destroy()
方法,rocketMQTemplate
會在spring容器銷燬時自動銷燬。 -
啟動報錯:
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please
RocketMQ在設計時就不希望一個消費者同時處理多個型別的訊息,因此同一個
consumerGroup
下的consumer職責應該是一樣的,不要幹不同的事情(即消費多個topic)。建議consumerGroup
與topic
一一對應。 -
傳送的訊息內容體是如何被序列化與反序列化的?
RocketMQ的訊息體都是以
byte[]
方式儲存。當業務系統的訊息內容體如果是java.lang.String
型別時,統一按照utf-8
編碼轉成byte[]
;如果業務系統的訊息內容為非java.lang.String
型別,則採用jackson-databind序列化成JSON
格式的字串之後,再統一按照utf-8
編碼轉成byte[]
。 -
如何指定topic的
tags
?RocketMQ的最佳實踐中推薦:一個應用盡可能用一個Topic,訊息子型別用
tags
來標識,tags
可以由應用自由設定。 在使用rocketMQTemplate
傳送訊息時,通過設定傳送方法的destination
引數來設定訊息的目的地,destination
的格式為topicName:tagName
,:
前面表示topic的名稱,後面表示tags
名稱。注意:
tags
從命名來看像是一個複數,但傳送訊息時,目的地只能指定一個topic下的一個tag
,不能指定多個。 -
傳送訊息時如何設定訊息的
key
?可以通過過載的
xxxSend(String destination, Message<?> msg, ...)
方法來發送訊息,指定msg
的headers
來完成。示例:Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);
同理還可以根據上面的方式來設定訊息的
FLAG
、WAIT_STORE_MSG_OK
以及一些使用者自定義的其它頭資訊。注意:
在將Spring的Message轉化為RocketMQ的Message時,為防止
header
資訊與RocketMQ的系統屬性衝突,在所有header
的名稱前面都統一添加了字首USERS_
。因此在消費時如果想獲取自定義的訊息頭資訊,請遍歷頭資訊中以USERS_
開頭的key即可。 -
消費訊息時,除了獲取訊息
payload
外,還想獲取RocketMQ訊息的其它系統屬性,需要怎麼做?消費者在實現
RocketMQListener
介面時,只需要起泛型為MessageExt
即可,這樣在onMessage
方法將接收到RocketMQ原生的MessageExt
訊息。@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener<MessageExt>{ public void onMessage(MessageExt messageExt) { log.info("received messageExt: {}", messageExt); } }
-
如何指定消費者從哪開始消費訊息,或開始消費的位置?
消費者預設開始消費的位置請參考:RocketMQ FAQ。 若想自定義消費者開始的消費位置,只需在消費者類新增一個
RocketMQPushConsumerLifecycleListener
介面的實現即可。 示例如下:@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
同理,任何關於
DefaultMQPushConsumer
的更多其它其它配置,都可以採用上述方式來完成。 -
如何傳送事務訊息?
在客戶端,首先使用者需要實現RocketMQLocalTransactionListener介面,並在介面類上註解宣告@RocketMQTransactionListener,實現確認和回查方法;然後再使用資源模板RocketMQTemplate, 呼叫方法sendMessageInTransaction()來進行訊息的釋出。 注意:從RocketMQ-Spring 2.1.0版本之後,註解@RocketMQTransactionListener不能設定txProducerGroup、ak、sk,這些值均與對應的RocketMQTemplate保持一致。
-
如何宣告不同name-server或者其他特定的屬性來定義非標的RocketMQTemplate?
第一步: 定義非標的RocketMQTemplate使用你需要的屬性,可以定義與標準的RocketMQTemplate不同的nameserver、groupname等。如果不定義,它們取全域性的配置屬性值或預設值。
// 這個RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 與所定義的類名相同(但首字母小寫) @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876" , ... // 定義其他屬性,如果有必要。 ) public class ExtRocketMQTemplate extends RocketMQTemplate { //類裡面不需要做任何修改 }
第二步: 使用這個非標RocketMQTemplate
@Resource(name = "extRocketMQTemplate") // 這裡必須定義name屬性來指向上述具體的Spring Bean. private RocketMQTemplate extRocketMQTemplate;
接下來就可以正常使用這個extRocketMQTemplate了。
-
如何使用非標的RocketMQTemplate傳送事務訊息?
首先使用者需要實現RocketMQLocalTransactionListener介面,並在介面類上註解宣告@RocketMQTransactionListener,註解欄位的rocketMQTemplateBeanName指明為非標的RocketMQTemplate的Bean name(若不設定則預設為標準的RocketMQTemplate),比如非標的RocketMQTemplate Bean name為“extRocketMQTemplate",則程式碼如下:
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } }
然後使用extRocketMQTemplate呼叫sendMessageInTransaction()來發送事務訊息。
-
MessageListener消費端,是否可以指定不同的name-server而不是使用全域性定義的'rocketmq.name-server'屬性值 ?
@Service @RocketMQMessageListener( nameServer = "NEW-NAMESERVER-LIST", // 可以使用這個optional屬性來指定不同的name-server topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyNameServerConsumer implements RocketMQListener<String> { ... }