springboot中rabbitmq的配置和使用【進階一】
阿新 • • 發佈:2019-03-06
1、yml配置
alimq: ProducerId: PRODUCER(mq中定義) ConsumerId: CONSUMER(mq中定義) AccessKey: SecretKey: ONSAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet SendMsgTimeoutMillis: 3000 topic: TOPIC #mq開關 0-不啟動消費 1-啟動消費 mqflag: 1 tag: ZC_xxx(mq中定義)
2、ali生產者和消費者配置
package common.config; import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import java.util.Properties; /** * @Description: * @Auther: liuyue * @Date: */ @Configuration @Data public class AliMQConfig { @Value("${alimq.topic}") private String topic; @Value("${alimq.ProducerId}") private String producerId; @Value("${alimq.ConsumerId}") private String consumerId; @Value("${alimq.AccessKey}") private String accesskey; @Value("${alimq.SecretKey}") private String secretkey; @Value("${alimq.ONSAddr}") private String onsaddr; @Value("${alimq.tag}") private String subExpression; //提供消費者的配置 public Properties getConsumerProperties() { Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId); consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey); consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey); consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr); return consumerProperties; } //提供生產者的配置 public Properties getProducerProperties() { Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId); producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey); producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr); return producerProperties; } }
3、消費者監聽器
package common.config; import config.alimq.MQMsgConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Description: * @Auther: liuyue * @Date: 2018/11/14 19:31 */ @Component @Slf4j public class ListenerConfig implements CommandLineRunner { @Resource MQMsgConsumer mqConsumer; @Value("${alimq.mqflag}") private String mqflag; @Override public void run(String... strings) throws Exception { if("0".equals(mqflag)){ log.info("alimq沒有開啟消費"); }else{ log.info("=======alimq開始消費========="); mqConsumer.start(); mqConsumer.onMessage(); } } }
4、消費者類
package config.alimq;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import config.SpringContextHolder;
import config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@SuppressWarnings("all")
@Slf4j
@Component
public class MQMsgConsumer implements InitializingBean, DisposableBean {
@Autowired
AliMQConfig busMqConfig;
private Consumer busConsumer;
@Autowired
IPanoramaProService panoramaProServiceImpl;
@Override
public void afterPropertiesSet() throws Exception {
log.info("消費者初始化");
busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties());
// busConsumer.start();
log.info("消費者初始化完成");
}
public void start() {
busConsumer.start();
}
public void onMessage() {
busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
// System.out.println(JSON.toJSONString(message));
System.out.println("Receive: " + message);
System.out.println(new String(message.getBody()));
Action consumer = consumer(message, context);
return consumer;
}
});
}
@Override
public void destroy() throws Exception {
busConsumer.shutdown();
log.info("消費停止");
}
//執行mq消費
public Action consumer(Message message, ConsumeContext context) {
//更新稽核時間
if("ZC_xxx".equals(message.getTag()))
{
boolean status = synchroProjectPlanStatus(message, context);
if (!status) {
return Action.CommitMessage;
}
}
return Action.ReconsumeLater;
}
/**
* 更新稽核時間
* @author liu
* @since 2018年11月2日 下午2:10:34
* @param message
* @param context
* @return
*/
private boolean synchroProjectPlanStatus( Message message, ConsumeContext context ){
boolean bl = false;
byte[] msgBody = message.getBody();
if( null != msgBody && msgBody.length > 0 ){
try {
String msgBodyStr = new String(msgBody, "UTF-8");
log.info(" THE MQ message body value: " + msgBodyStr);
//JSONObject msgJson = JSONObject.parseObject(msgBodyStr);
if( null != msgBodyStr ){
//轉化為物件
ProjectPlanParas projectPlanParas =
JSON.parseObject(msgBodyStr, ProjectPlanParas.class);
log.info(" THE ProjectPlanParas value: " + projectPlanParas.getZutuanCode());
log.info(" THE ProjectPlanParas value: " + projectPlanParas.getFinishDate());
//執行更新的操作
bl = panoramaProServiceImpl.synchroProjectPlanStatus(projectPlanParas);
log.info(" THE MQ synchroProjectPlanStatus status : " + bl);
}
} catch (UnsupportedEncodingException e) {
log.info(" THE MQ message UnsupportedEncodingException : " + e);
e.printStackTrace();
}
}
return bl;
}
}
5、生產者類
package config.alimq;
import com.aliyun.openservices.ons.api.*;
import com.config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
@Component
@Slf4j
public class MQMsgProducer implements InitializingBean, DisposableBean {
@Autowired
AliMQConfig busMqConfig;
private Producer producer;
@Override
public void afterPropertiesSet() throws Exception {
log.info("生產者初始化");
producer = ONSFactory.createProducer(busMqConfig.getProducerProperties());
producer.start();
}
public void sentMessage(Message message) {
producer.sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(sendResult.getTopic() + "-----" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
log.error(context.getTopic() + "-----" + context.getMessageId() + ":error=" + context.getException());
}
});
}
@Override
public void destroy() throws Exception {
producer.shutdown();
}
}
6、生產者呼叫類,推送訊息,業務程式碼片段
@Resource
MQMsgProducer mqProducer;
//修改成,使用alimq更新年景計劃的時間 edit by liuy at 20181102日
Message msg = new Message(aliMQConfig.getTopic(),
"ZC_xxx", json.getBytes("UTF-8"));
mqProducer.sentMessage(msg);
至此,全部過