spring boot:用rocketmq訊息訂閱實現刪除購物車商品功能(spring boot 2.3.3)
阿新 • • 發佈:2020-09-16
一,為什麼要使用訊息佇列實現刪除購物車商品功能?
訊息佇列主要用來處理不需要立刻返回結果的業務,
常見的例子:
使用者在下單後,要清除原購物車中的商品,
這個處理過程不需要馬上實現也不需要返回結果給使用者,
所以就適合使用佇列來實現
說明:劉巨集締的架構森林是一個專注架構的部落格,地址:https://www.cnblogs.com/architectforest
對應的原始碼可以訪問這裡獲取:https://github.com/liuhongdi/
說明:作者:劉巨集締 郵箱: [email protected]
二,演示專案的相關資訊
1,專案地址
https://github.com/liuhongdi/mqcart
2,專案功能說明:
演示了用rocketmq實現訊息的傳送和接收
3, 專案結構:如圖:
三,配置檔案說明
1,send/pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <!--fastjson begin--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency>
2,receive/pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <!--fastjson begin--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency>
說明:兩個模組的pom.xml內容相同
3,receive/application.properties
server.port=8081
說明:兩個模組同時執行時,需要把埠區分開,
send不做修改,使用預設的8080埠
receive這裡指定使用8081埠
四,java程式碼說明
1,send/HomeController.java
@RestController @RequestMapping("/home") public class HomeController { @Autowired private Producer producer; //初始化併發送訊息 @RequestMapping("/send") public String send() throws Exception { //要刪除的購物車的id List<Integer> cartList = new ArrayList<Integer>(); cartList.add(1); cartList.add(2); cartList.add(3); //訊息 CartMsg msg = new CartMsg(); msg.setUserId(1); msg.setCartList(cartList); String msgJson = JSON.toJSONString(msg); //生成一個資訊,標籤在這裡手動指定 Message message = new Message(RocketConstants.TOPIC, "carttag", msgJson.getBytes()); //傳送資訊 SendResult sendResult = producer.getProducer().send(message); System.out.println("生產者已傳送一條資訊,內容={"+sendResult+"}"); return "success"; } }
2,send/CartMsg.java
//購物車訊息 public class CartMsg { //使用者id private int userId; public int getUserId() { return this.userId; } public void setUserId(int userId) { this.userId = userId; } //購物車id private List<Integer> cartList; public List<Integer> getCartList() { return this.cartList; } public void setCartList(List<Integer> cartList) { this.cartList = cartList; } }
傳送的訊息體,
兩個模組中的的CartMsg.java檔案相同
3,send/RocketConstants.java
public class RocketConstants { //name server,有多個時用分號隔開 public static final String NAME_SERVER = "127.0.0.1:9876"; //topic的名字,應該從服務端先建立好,否則會報錯 public static final String TOPIC = "laoliutest"; }
配置name server和topic,
兩個模組中的的RocketConstants.java檔案相同
4,send/Producer.java
//訊息生產者類 @Component public class Producer { private String producerGroup = "cart_producer"; private DefaultMQProducer producer; //構造 public Producer(){ //建立生產者 producer = new DefaultMQProducer(producerGroup); //不開啟vip通道 producer.setVipChannelEnabled(false); //設定 name server producer.setNamesrvAddr(RocketConstants.NAME_SERVER); start(); } //使producer啟動 public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } //返回producer public DefaultMQProducer getProducer(){ return this.producer; } //進行關閉的方法 public void shutdown(){ this.producer.shutdown(); } }
5,receive/Consumer.java
@Component public class Consumer { //消費者實體物件 private DefaultMQPushConsumer consumer; //消費者組 public static final String CONSUMER_GROUP = "cart_consumer"; //建構函式 用來例項化物件 public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(RocketConstants.NAME_SERVER); //指定消費模式 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //指定訂閱主題 //指定訂閱標籤,*代表所有標籤 consumer.subscribe(RocketConstants.TOPIC, "*"); //註冊一個消費訊息的Listener //對訊息的消費在這裡實現 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { //遍歷接收到的訊息 try { for (Message msg : msgs) { //得到訊息的body String body = new String(msg.getBody(), "utf-8"); //用json轉成物件 CartMsg msgOne = JSON.parseObject(body, CartMsg.class); //列印使用者id System.out.println(msgOne.getUserId()); //列印訊息內容 System.out.println("消費者已接收到訊息-topic={"+msg.getTopic()+"}, 訊息內空={"+body+"}"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消費者 啟動成功======="); } }
這個是訊息的消費者
6,其他訊息的非關鍵程式碼可訪問github
五,演示效果
1,訪問send模組的controller:
http://127.0.0.1:8080/home/send
顯示傳送成功:
success
檢視控制檯:
生產者已傳送一條資訊,內容={SendResult [sendStatus=SEND_OK, msgId=C0A803D5113442A57993512ADA8E0000,
offsetMsgId=7F00000100002A9F0000000000003AE8, messageQueue=MessageQueue [topic=laoliutest, brokerName=broker-a, queueId=3], queueOffset=0]}
2,檢視receive模組的控制檯:
1
消費者已接收到訊息-topic={laoliutest}, 訊息內容={{"cartList":[1,2,3],"userId":1}
收到了訊息,訊息內容可解析
六,檢視spring boot版本:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.3.3.RELEASE)