SpringBoot2.0 整合 RocketMQ ,實現請求非同步處理
阿新 • • 發佈:2019-06-25
一、RocketMQ
1、架構圖片
2、角色分類
(1)、Broker
RocketMQ 的核心,接收 Producer 發過來的訊息、處理 Consumer 的消費訊息請求、訊息的持 久化儲存、服務端過濾功能等 。
(2)、NameServer
訊息佇列中的狀態伺服器,叢集的各個元件通過它來了解全域性的資訊 。類似微服務中註冊中心的服務註冊,發現,下線,上線的概念。
熱備份: NamServer可以部署多個,相互之間獨立,其他角色同時向多個NameServer 機器上報狀態資訊。
心跳機制: NameServer 中的 Broker、 Topic等狀態資訊不會持久儲存,都是由各個角色定時上報並存儲到記憶體中,超時不上報的話, NameServer會認為某個機器出故障不可用。
(3)、Producer
訊息的生成者,最常用的producer類就是DefaultMQProducer。
(4)、Consumer
訊息的消費者,常用Consumer類 DefaultMQPushConsumer 收到訊息後自動呼叫傳入的處理方法來處理,實時性高 DefaultMQPullConsumer 使用者自主控制 ,靈活性更高。
3、通訊機制
(1)、Broker啟動後需要完成一次將自己註冊至NameServer的操作;隨後每隔30s時間定時向NameServer更新Topic路由資訊。
(2)、Producer傳送訊息時候,需要根據訊息的Topic從本地快取的獲取路由資訊。如果沒有則更新路由資訊會從NameServer重新拉取,同時Producer會預設每隔30s向NameServer拉取一次路由資訊。
(3)、Consumer消費訊息時候,從NameServer獲取的路由資訊,並再完成客戶端的負載均衡後,監聽指定訊息佇列獲取訊息並進行消費。
二、程式碼實現案例
1、專案結構圖
版本描述
<spring-boot.version>2.1.3.RELEASE</spring-boot.version>
<rocketmq.version>4.3.0</rocketmq.version>
2、配置檔案
rocketmq: # 生產者配置 producer: isOnOff: on # 傳送同一類訊息的設定為同一個group,保證唯一 groupName: FeePlatGroup # 服務地址 namesrvAddr: 10.1.1.207:9876 # 訊息最大長度 預設1024*4(4M) maxMessageSize: 4096 # 傳送訊息超時時間,預設3000 sendMsgTimeout: 3000 # 傳送訊息失敗重試次數,預設2 retryTimesWhenSendFailed: 2 # 消費者配置 consumer: isOnOff: on # 官方建議:確保同一組中的每個消費者訂閱相同的主題。 groupName: FeePlatGroup # 服務地址 namesrvAddr: 10.1.1.207:9876 # 接收該 Topic 下所有 Tag topics: FeePlatTopic~*; consumeThreadMin: 20 consumeThreadMax: 64 # 設定一次消費訊息的條數,預設為1條 consumeMessageBatchMaxSize: 1 # 配置 Group Topic Tag fee-plat: fee-plat-group: FeePlatGroup fee-plat-topic: FeePlatTopic fee-account-tag: FeeAccountTag
3、生產者配置
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RocketMQ 生產者配置
*/
@Configuration
public class ProducerConfig {
private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize ;
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
@Bean
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一個jvm中不同的producer往不同的mq叢集傳送訊息,需要設定不同的instanceName
if(this.maxMessageSize!=null){
producer.setMaxMessageSize(this.maxMessageSize);
}
if(this.sendMsgTimeout!=null){
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果傳送訊息失敗,設定重試次數,預設為2次
if(this.retryTimesWhenSendFailed!=null){
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
4、消費者配置
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* RocketMQ 消費者配置
*/
@Configuration
public class ConsumerConfig {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Resource
private RocketMsgListener msgListener;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(msgListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
String[] topicTagsArr = topics.split(";");
for (String topicTags : topicTagsArr) {
String[] topicTag = topicTags.split("~");
consumer.subscribe(topicTag[0],topicTag[1]);
}
consumer.start();
}catch (MQClientException e){
e.printStackTrace();
}
return consumer;
}
}
5、訊息監聽配置
import com.rocket.queue.service.impl.ParamConfigService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* 訊息消費監聽
*/
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
@Resource
private ParamConfigService paramConfigService ;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(list)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = list.get(0);
LOG.info("接受到的訊息為:"+new String(messageExt.getBody()));
int reConsume = messageExt.getReconsumeTimes();
// 訊息已經重試了3次,如果不需要再次消費,則返回成功
if(reConsume ==3){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
if(messageExt.getTopic().equals(paramConfigService.feePlatTopic)){
String tags = messageExt.getTags() ;
switch (tags){
case "FeeAccountTag":
LOG.info("開戶 tag == >>"+tags);
break ;
default:
LOG.info("未匹配到Tag == >>"+tags);
break;
}
}
// 訊息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
6、配置引數繫結
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ParamConfigService {
@Value("${fee-plat.fee-plat-group}")
public String feePlatGroup ;
@Value("${fee-plat.fee-plat-topic}")
public String feePlatTopic ;
@Value("${fee-plat.fee-account-tag}")
public String feeAccountTag ;
}
7、訊息傳送測試
import com.rocket.queue.service.FeePlatMqService;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class FeePlatMqServiceImpl implements FeePlatMqService {
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private ParamConfigService paramConfigService ;
@Override
public SendResult openAccountMsg(String msgInfo) {
// 可以不使用Config中的Group
defaultMQProducer.setProducerGroup(paramConfigService.feePlatGroup);
SendResult sendResult = null;
try {
Message sendMsg = new Message(paramConfigService.feePlatTopic,
paramConfigService.feeAccountTag,
"fee_open_account_key", msgInfo.getBytes());
sendResult = defaultMQProducer.send(sendMsg);
} catch (Exception e) {
e.printStackTrace();
}
return sendResul