1. 程式人生 > 其它 >springboot整合阿里雲rocketMQ程式碼示例

springboot整合阿里雲rocketMQ程式碼示例

技術標籤:springrocketMQ生產者消費者springboot整合

整合目標:完成生產者傳送訊息,消費者接收訊息的整個流程

整合步驟:

1、引入jar包依賴

   <!--rocketMq訊息佇列-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.4.Final</version>
        </dependency>

2、初始化生產者連線

package com.gaozhen.webservicedemo.config;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

@Component
public class RocketMqProducerConfiguration {
    @Value("GID_sgcc_1")
    private String producerGroupName;

    @Value("172.16.205.55:9876")
    private String namesrvAddr;

    @Value("36Rl3QPMNNXJifNC")
    private String accessKey;

    @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
    private String secretKey;

    private static Producer producer;

    @PostConstruct
    public void init() {
        // producer 例項配置初始化
        Properties properties = new Properties();
        //您在控制檯建立的Producer ID
       // properties.setProperty(PropertyKeyConst.ProducerId,RocketMqConfig.producerGroupName);
        properties.setProperty(PropertyKeyConst.ProducerId,producerGroupName);
        // AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
       // properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        // SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
        //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        //設定傳送超時時間,單位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設定 TCP 接入域名(此處以公共雲生產環境為例),設定 TCP 接入域名,進入 MQ 控制檯的消費者管理頁面,在左側操作欄單擊獲取接入點獲取
       // properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        producer = ONSFactory.createProducer(properties);
        //在傳送訊息前,初始化呼叫start方法來啟動Producer,只需呼叫一次即可,當專案關閉時,自動shutdown
        producer.start();
    }

    /**
     * 初始化生產者
     * @return
     */
    public Producer getProducer(){
        return producer;
    }



}

3、使用初始化的生產者producer傳送訊息massage

package com.gaozhen.webservicedemo.controller;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.gaozhen.webservicedemo.config.RocketMqProducerConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
public class TestController {

    @Autowired
    private RocketMqProducerConfiguration rocketMqProducerConfiguration;

    @GetMapping("/sendMsg")
    public String sendMsg(){

        String toTopic = "topic_sx";
        String tag = "tag1";
        Message msg = new Message(toTopic, tag, "topic_sx,tag1傳送的資訊".getBytes());
        try {
            SendResult result = rocketMqProducerConfiguration.getProducer().send(msg);
            if(result!=null){
                System.out.println(new Date() + " Send mq message success. Topic is:"+ toTopic + " messageId is: " + result.getMessageId());
            } else {
                //logger.warn(".sendResult is null.........");
                System.out.println(".sendResult is null.........");
            }
            return "傳送Mq訊息成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "傳送Mq訊息失敗:"+ e.getMessage();
        }
    }

}

4、初始化消費者監聽listener

package com.gaozhen.webservicedemo.config;

import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.gaozhen.webservicedemo.service.RocketMqListener;
import com.gaozhen.webservicedemo.util.UUIDUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

@Component
public class RocketMqConsumerConfiguration {
    @Autowired
    RocketMqListener rocketMqListener;

    @Value("GID_sgcc_1")
    private String consumerGroupName;

    @Value("172.16.205.55:9876")
    private String namesrvAddr;

    @Value("36Rl3QPMNNXJifNC")
    private String accessKey;

    @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
    private String secretKey;

    public static final String tag = "tag1";

    private static Consumer consumer;

    @PostConstruct
    public void init() {
        // consumer 例項配置初始化
        Properties properties = new Properties();
        //您在控制檯建立的consumer ID
        //properties.setProperty(PropertyKeyConst.ConsumerId, RocketMqConfig.consumerGroupName);
        properties.setProperty(PropertyKeyConst.ConsumerId, consumerGroupName);
        // AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
        //properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        // SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
        //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        //設定傳送超時時間,單位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 設定 TCP 接入域名(此處以公共雲生產環境為例),設定 TCP 接入域名,進入 MQ 控制檯的消費者管理頁面,在左側操作欄單擊獲取接入點獲取
        //properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        properties.setProperty(PropertyKeyConst.InstanceName, UUIDUtil.getUUID32());
        consumer = ONSFactory.createConsumer(properties);
        //------------------------------訂閱topic-------------------------------------------------
        consumer.subscribe("topic_sx",tag, rocketMqListener);//監聽第一個topic,new對應的監聽器
        // 在傳送訊息前,必須呼叫start方法來啟動consumer,只需呼叫一次即可,當專案關閉時,自動shutdown
        consumer.start();
        System.out.println("ConsumerConfig start success.");
    }

    /**
     * 初始化消費者
     * @return
     */
    public Consumer getconsumer(){
        return consumer;
    }
}

5、其中的rocketMqListener實現MessageListener的自定義接收訊息的監聽類

package com.gaozhen.webservicedemo.service;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class RocketMqListener implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        try {
            System.out.println("MessageListener.consume ok:" + message);
            byte[] body = message.getBody();
            String messageBody = new String(body);// 獲取到接收的訊息,由於接收到的是byte陣列,所以需要轉換成字串
            System.out.println("收到傳送的資訊: " + messageBody);

        } catch (Exception e) {
            System.out.println("MessageListener.consume error:" + e.getMessage() );
        }
        System.out.println("MessageListener.Receive message");
        // 如果想測試訊息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
        return Action.CommitMessage;
    }

}

最後,當訪問sendMsg介面,生產者講傳送一個條訊息到制定的topic和tag中去,消費者也必須用相同的topic和tag來接收,其中topic和tag可以理解為訊息的一級標題和二級標籤,如果不清楚tag可以用萬用字元“*”或者null來接收全部topic的訊息,groupid可以一致也可以不一致,具體三者的區別和用法,我將另外寫一篇文章重點介紹