阿里雲訊息佇列mq(消費者)如何整合springboot,並能使用services
阿新 • • 發佈:2019-01-29
建立一個訊息佇列的資料庫維護表:
CREATE TABLE `consumer_local` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`version` bigint(20) NOT NULL,
`topic` varchar(30) NOT NULL COMMENT '消費者所屬的主題',
`consumer_id` varchar(50) NOT NULL COMMENT '消費者Consumer_id',
`access_key` varchar(60) NOT NULL COMMENT '阿里雲身份驗證',
`secre_key` varchar(300) NOT NULL COMMENT '阿里雲身份驗證,在阿里雲伺服器管理控制檯建立',
`ons_address` varchar(300) NOT NULL COMMENT '設定的TCP接入域名',
`suspend_time_millis` varchar(10) NOT NULL COMMENT '順序訊息消費失敗進行重試前的等待時間(毫秒)',
`max_reconsume_times` varchar(10) NOT NULL COMMENT '訊息消費失敗時的最大重試次數',
`sharding_key` varchar(30) NOT NULL COMMENT '順序訊息的區間' ,
`tag` varchar(30) NOT NULL COMMENT '處理訊息的型別',
`investor` varchar(50) NOT NULL COMMENT '消費監控的類名',
`status` varchar(30) NOT NULL COMMENT '消費者的狀態(stop,running,init)',
`state` int(1) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
然後再springboot的配置檔案中新增配置的資料庫Id:1
aliyun:
consumerId: 1
然後建立使用springboot配置的配置檔案類:
package com.ryhui.properties;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.OrderConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.ryhui.model.ConsumerLocal;
import com.ryhui.service.modelService.ConsumerLocalService;
import com.ryhui.utils.MessageListenerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Created by 10351 on 2018/7/4.
*/
@Component
@ConfigurationProperties(prefix="aliyun")
@Slf4j
public class AliMQConfig {
@Autowired
private MessageListenerUtil messageListenerUtil;
@Autowired
private ConsumerLocalService consumerLocalService;
private String consumerId;
public String getConsumerId() {
return consumerId;
}
public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public OrderConsumerBean getConsumer() {
ConsumerLocal consumerLocal = consumerLocalService.selectById(Long.valueOf(consumerId));
if(consumerLocal!=null&&consumerLocal.getStatus().equals("stop")){
OrderConsumerBean consumerBean = new OrderConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerLocal.getConsumerId());
// AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
properties.put(PropertyKeyConst.AccessKey, consumerLocal.getAccessKey());
// SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
properties.put(PropertyKeyConst.SecretKey, consumerLocal.getSecreKey());
//訊息處理失敗後多久重新發送訊息
properties.put(PropertyKeyConst.SuspendTimeMillis, consumerLocal.getSuspendTimeMillis());
//重發的次數
properties.put(PropertyKeyConst.MaxReconsumeTimes, consumerLocal.getMaxReconsumeTimes());
//消費者的執行緒數
properties.put(PropertyKeyConst.ConsumeThreadNums,"1");
//消費者的介入地址
properties.put(PropertyKeyConst.ONSAddr, consumerLocal.getOnsAddress());
consumerBean.setProperties(properties);
Subscription subscription = new Subscription();
subscription.setTopic(consumerLocal.getTopic());
subscription.setExpression(consumerLocal.getTag());
Map<Subscription, MessageOrderListener> map = new HashMap();
map.put(subscription, new MessageOrderListener(){
@Override
public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
log.info("【{}】有訊息進入,開始處理",message.getMsgID());
messageListenerUtil.sendMessage(message);
log.info("【{}】訊息處理結束",message.getMsgID());
return OrderAction.Success;
}
});
consumerBean.setSubscriptionTable(map);
consumerLocal.setStatus("running");
consumerLocalService.update(consumerLocal);
return consumerBean;
}else{
return new OrderConsumerBean();
}
}
}
監聽使用內部類格式,能夠使用再配置檔案中引入的spring管理類。spring管理類如下:
package com.ryhui.utils;
import com.aliyun.openservices.ons.api.Message;
import com.ryhui.model.ConsumerLocal;
import com.ryhui.model.ConsumerMessage;
import com.ryhui.service.modelService.ConsumerLocalService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Created by 10351 on 2018/7/4.
*/
@Component
public class MessageListenerUtil {
@Resource
private ConsumerLocalService consumerLocalService;
public String sendMessage(Message message){
System.out.println(message.toString());
ConsumerLocal consumerLocal = consumerLocalService.selectByTopicAndTag(message.getTopic(),message.getTag());
if(consumerLocal!=null){
ConsumerMessage consumerMessage = consumerLocalService.selectByMessageUid(message.getMsgID());
if(consumerMessage==null){
consumerMessage = consumerLocalService.handlerMessage(message,consumerLocal.getId());
}
consumerLocalService.updateMessage(consumerMessage.getId(),"success");
}
System.out.println(message.toString()+"end");
return "hello";
}
}
專案啟動後會自動開啟消費者,如果需要多個消費者,就需要在資料庫多配置一條資料,然後把配置的id寫入到配置檔案中。然後再配置類中引用。然後根據這個配置寫一個消費者邏輯。模式同上。