Springboot2整合RocketMQ4.x
Springboot2.x整合RocketMQ4.x實戰,加入相關依賴,開發生產者程式碼
啟動nameser和broker
微信支付架構圖:
微信支付呼叫相應的API進行支付,支付完成後回撥,把相應的支付資訊封裝成物件發給生產者。生產者把訊息傳送給訊息佇列 broker,通過NameServer獲取相應的路由地址。訊息訂閱者從broker獲取相應的訊息,生成相應的支付憑證,增加積分等
1、加入相關依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
</dependency>
2、application.properties加入配置檔案
# 消費者的組名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生產者的組名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
3、開發MsgProducer
/**
* 生產者的組名
*/
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer ;
public DefaultMQProducer getProducer(){
return this.producer;
}
@PostConstruct
public void defaultMQProducer() {
//生產者的組名
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多個地址以 ; 隔開
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
/**
* Producer物件在使用之前必須要呼叫start初始化,只能初始化一次
*/
producer.start();
} catch (Exception e) {
e.printStackTrace();
}
// producer.shutdown(); 一般在應用上下文,關閉的時候進行關閉,用上下文監聽器
}
模擬微型支付回撥的controller
@RestController
@RequestMapping("/api/v1")
public class OrderController {
@Autowired
private MsgProducer msgProducer;
/**
* 功能描述:微信支付回撥介面
* @param msg 支付資訊
* @param tag 訊息二級分類
* @return
*/
@GetMapping("order")
public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{
/**
* 建立一個訊息例項,包含 topic、tag 和 訊息體
*/
Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = msgProducer.getProducer().send(message);
System.out.println("傳送響應:MsgId:" + result.getMsgId() + ",傳送狀態:" + result.getSendStatus());
return JsonData.buildSuccess();
}
5.建立消費者
@Component
public class MsgConsumer {
/**
* 消費者的組名
*/
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消費者的組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr(namesrvAddr);
try {
//設定consumer所訂閱的Topic和Tag,*代表全部的Tag
consumer.subscribe("testTopic", "*");
//CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,跳過歷史訊息
//CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//MessageListenerOrderly 這個是有序的
//MessageListenerConcurrently 這個是無序的,並行的方式處理,效率高很多
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);//輸出訊息內容
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//輸出訊息內容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}