springboot整合阿里雲rocketMQ程式碼示例
阿新 • • 發佈:2020-12-31
技術標籤:springrocketMQ生產者消費者springboot整合
整合目標:完成生產者傳送訊息,消費者接收訊息的整個流程
整合步驟:
1、引入jar包依賴
<!--rocketMq訊息佇列--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version> </dependency>
2、初始化生產者連線
package com.gaozhen.webservicedemo.config; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Properties; @Component public class RocketMqProducerConfiguration { @Value("GID_sgcc_1") private String producerGroupName; @Value("172.16.205.55:9876") private String namesrvAddr; @Value("36Rl3QPMNNXJifNC") private String accessKey; @Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0") private String secretKey; private static Producer producer; @PostConstruct public void init() { // producer 例項配置初始化 Properties properties = new Properties(); //您在控制檯建立的Producer ID // properties.setProperty(PropertyKeyConst.ProducerId,RocketMqConfig.producerGroupName); properties.setProperty(PropertyKeyConst.ProducerId,producerGroupName); // AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立 // properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立 //properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); //設定傳送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設定 TCP 接入域名(此處以公共雲生產環境為例),設定 TCP 接入域名,進入 MQ 控制檯的消費者管理頁面,在左側操作欄單擊獲取接入點獲取 // properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); producer = ONSFactory.createProducer(properties); //在傳送訊息前,初始化呼叫start方法來啟動Producer,只需呼叫一次即可,當專案關閉時,自動shutdown producer.start(); } /** * 初始化生產者 * @return */ public Producer getProducer(){ return producer; } }
3、使用初始化的生產者producer傳送訊息massage
package com.gaozhen.webservicedemo.controller; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.gaozhen.webservicedemo.config.RocketMqProducerConfiguration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController public class TestController { @Autowired private RocketMqProducerConfiguration rocketMqProducerConfiguration; @GetMapping("/sendMsg") public String sendMsg(){ String toTopic = "topic_sx"; String tag = "tag1"; Message msg = new Message(toTopic, tag, "topic_sx,tag1傳送的資訊".getBytes()); try { SendResult result = rocketMqProducerConfiguration.getProducer().send(msg); if(result!=null){ System.out.println(new Date() + " Send mq message success. Topic is:"+ toTopic + " messageId is: " + result.getMessageId()); } else { //logger.warn(".sendResult is null........."); System.out.println(".sendResult is null........."); } return "傳送Mq訊息成功"; } catch (Exception e) { e.printStackTrace(); return "傳送Mq訊息失敗:"+ e.getMessage(); } } }
4、初始化消費者監聽listener
package com.gaozhen.webservicedemo.config;
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.gaozhen.webservicedemo.service.RocketMqListener;
import com.gaozhen.webservicedemo.util.UUIDUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class RocketMqConsumerConfiguration {
@Autowired
RocketMqListener rocketMqListener;
@Value("GID_sgcc_1")
private String consumerGroupName;
@Value("172.16.205.55:9876")
private String namesrvAddr;
@Value("36Rl3QPMNNXJifNC")
private String accessKey;
@Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
private String secretKey;
public static final String tag = "tag1";
private static Consumer consumer;
@PostConstruct
public void init() {
// consumer 例項配置初始化
Properties properties = new Properties();
//您在控制檯建立的consumer ID
//properties.setProperty(PropertyKeyConst.ConsumerId, RocketMqConfig.consumerGroupName);
properties.setProperty(PropertyKeyConst.ConsumerId, consumerGroupName);
// AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
//properties.setProperty(PropertyKeyConst.AccessKey, RocketMqConfig.accessKey);
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
//properties.setProperty(PropertyKeyConst.SecretKey, RocketMqConfig.secretKey);
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//設定傳送超時時間,單位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定 TCP 接入域名(此處以公共雲生產環境為例),設定 TCP 接入域名,進入 MQ 控制檯的消費者管理頁面,在左側操作欄單擊獲取接入點獲取
//properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, RocketMqConfig.namesrvAddr);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
properties.setProperty(PropertyKeyConst.InstanceName, UUIDUtil.getUUID32());
consumer = ONSFactory.createConsumer(properties);
//------------------------------訂閱topic-------------------------------------------------
consumer.subscribe("topic_sx",tag, rocketMqListener);//監聽第一個topic,new對應的監聽器
// 在傳送訊息前,必須呼叫start方法來啟動consumer,只需呼叫一次即可,當專案關閉時,自動shutdown
consumer.start();
System.out.println("ConsumerConfig start success.");
}
/**
* 初始化消費者
* @return
*/
public Consumer getconsumer(){
return consumer;
}
}
5、其中的rocketMqListener實現MessageListener的自定義接收訊息的監聽類
package com.gaozhen.webservicedemo.service;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.springframework.stereotype.Service;
@Service
public class RocketMqListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
System.out.println("MessageListener.consume ok:" + message);
byte[] body = message.getBody();
String messageBody = new String(body);// 獲取到接收的訊息,由於接收到的是byte陣列,所以需要轉換成字串
System.out.println("收到傳送的資訊: " + messageBody);
} catch (Exception e) {
System.out.println("MessageListener.consume error:" + e.getMessage() );
}
System.out.println("MessageListener.Receive message");
// 如果想測試訊息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
return Action.CommitMessage;
}
}
最後,當訪問sendMsg介面,生產者講傳送一個條訊息到制定的topic和tag中去,消費者也必須用相同的topic和tag來接收,其中topic和tag可以理解為訊息的一級標題和二級標籤,如果不清楚tag可以用萬用字元“*”或者null來接收全部topic的訊息,groupid可以一致也可以不一致,具體三者的區別和用法,我將另外寫一篇文章重點介紹