1. 程式人生 > 實用技巧 >spring boot:用rocketmq訊息訂閱實現刪除購物車商品功能(spring boot 2.3.3)

spring boot:用rocketmq訊息訂閱實現刪除購物車商品功能(spring boot 2.3.3)

一,為什麼要使用訊息佇列實現刪除購物車商品功能?

訊息佇列主要用來處理不需要立刻返回結果的業務,

常見的例子:

使用者在下單後,要清除原購物車中的商品,

這個處理過程不需要馬上實現也不需要返回結果給使用者,

所以就適合使用佇列來實現

說明:劉巨集締的架構森林是一個專注架構的部落格,地址:https://www.cnblogs.com/architectforest

對應的原始碼可以訪問這裡獲取:https://github.com/liuhongdi/

說明:作者:劉巨集締 郵箱: [email protected]

二,演示專案的相關資訊

1,專案地址

https://github.com/liuhongdi/mqcart

2,專案功能說明:

演示了用rocketmq實現訊息的傳送和接收

3, 專案結構:如圖:

三,配置檔案說明

1,send/pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
<groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <!--fastjson begin--> <dependency> <groupId>com.alibaba</
groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency>

2,receive/pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <!--fastjson begin-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

說明:兩個模組的pom.xml內容相同

3,receive/application.properties

server.port=8081

說明:兩個模組同時執行時,需要把埠區分開,
send不做修改,使用預設的8080埠
receive這裡指定使用8081埠

四,java程式碼說明

1,send/HomeController.java

@RestController
@RequestMapping("/home")
public class HomeController {
    @Autowired
    private Producer producer;
    //初始化併發送訊息
    @RequestMapping("/send")
    public String send() throws Exception {
        //要刪除的購物車的id
        List<Integer> cartList = new ArrayList<Integer>();
        cartList.add(1);
        cartList.add(2);
        cartList.add(3);
        //訊息
        CartMsg msg = new CartMsg();
        msg.setUserId(1);
        msg.setCartList(cartList);

        String msgJson = JSON.toJSONString(msg);
        //生成一個資訊,標籤在這裡手動指定
        Message message = new Message(RocketConstants.TOPIC, "carttag", msgJson.getBytes());
        //傳送資訊
        SendResult sendResult = producer.getProducer().send(message);
        System.out.println("生產者已傳送一條資訊,內容={"+sendResult+"}");

        return "success";
    }
}

2,send/CartMsg.java

//購物車訊息
public class CartMsg {
    //使用者id
    private int userId;
    public int getUserId() {
        return this.userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }

    //購物車id
    private List<Integer> cartList;
    public List<Integer> getCartList() {
        return this.cartList;
    }
    public void setCartList(List<Integer> cartList) {
        this.cartList = cartList;
    }
}

傳送的訊息體,
兩個模組中的的CartMsg.java檔案相同

3,send/RocketConstants.java

public class RocketConstants {
    //name server,有多個時用分號隔開
    public static final String NAME_SERVER = "127.0.0.1:9876";
    //topic的名字,應該從服務端先建立好,否則會報錯
    public static final String TOPIC = "laoliutest";
}

配置name server和topic,

兩個模組中的的RocketConstants.java檔案相同

4,send/Producer.java

//訊息生產者類
@Component
public class Producer {
    private String producerGroup = "cart_producer";
    private DefaultMQProducer producer;
    //構造
    public Producer(){
        //建立生產者
        producer = new DefaultMQProducer(producerGroup);
        //不開啟vip通道
        producer.setVipChannelEnabled(false);
        //設定 name server
        producer.setNamesrvAddr(RocketConstants.NAME_SERVER);
        start();
    }

    //使producer啟動
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //返回producer
    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    //進行關閉的方法
    public void shutdown(){
        this.producer.shutdown();
    }
}

5,receive/Consumer.java

@Component
public class Consumer {

    //消費者實體物件
    private DefaultMQPushConsumer consumer;

    //消費者組
    public static final String CONSUMER_GROUP = "cart_consumer";

    //建構函式 用來例項化物件
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(RocketConstants.NAME_SERVER);
        //指定消費模式
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //指定訂閱主題
        //指定訂閱標籤,*代表所有標籤
        consumer.subscribe(RocketConstants.TOPIC, "*");
        //註冊一個消費訊息的Listener
        //對訊息的消費在這裡實現
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //遍歷接收到的訊息
            try {
                for (Message msg : msgs) {
                    //得到訊息的body
                    String body = new String(msg.getBody(), "utf-8");
                    //用json轉成物件
                    CartMsg msgOne = JSON.parseObject(body, CartMsg.class);
                    //列印使用者id
                    System.out.println(msgOne.getUserId());
                    //列印訊息內容
                    System.out.println("消費者已接收到訊息-topic={"+msg.getTopic()+"}, 訊息內空={"+body+"}");
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.println("消費者 啟動成功=======");
    }
}

這個是訊息的消費者

6,其他訊息的非關鍵程式碼可訪問github

五,演示效果

1,訪問send模組的controller:

http://127.0.0.1:8080/home/send

顯示傳送成功:

success

檢視控制檯:

生產者已傳送一條資訊,內容={SendResult [sendStatus=SEND_OK, msgId=C0A803D5113442A57993512ADA8E0000, 
offsetMsgId=7F00000100002A9F0000000000003AE8, messageQueue=MessageQueue [topic=laoliutest, brokerName=broker-a, queueId=3], queueOffset=0]}

2,檢視receive模組的控制檯:

1
消費者已接收到訊息-topic={laoliutest}, 訊息內容={{"cartList":[1,2,3],"userId":1}

收到了訊息,訊息內容可解析

六,檢視spring boot版本:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.3.RELEASE)