SpringBoot 整合 RocketMQ 消費資料存庫(MySQL)
阿新 • • 發佈:2019-01-17
廢話不多了,直接上程式碼
1.專案結構及環境配置
專案結構
src
main
java
com
harve
common --------------- 公共類
dao ------------------ DAO
datasource ----------- 資料來源
model ---------------- 實體類
mq
consumer --------- rocketmq 消費者
producer --------- rocketmq 生產者
service -------------- 業務處理
util ----------------- 工具類
resource
mapper ----------------------- mybatis 配置檔案
application.yml -------------- 全域性配置檔案
banner.txt ------------------- logo
logback-spring.xml ----------- 日誌配置
MySQL 建立表 SQL 指令碼
drop table if exists `order_test`;
create table `order_test` (
`id` int(11) not null auto_increment,
`order_no` varchar(20) default null,
`user_id` varchar(20) default null,
`order_money` double default null,
`pay_type` int (11) default null,
`create_time` varchar(20) default null,
primary key (`id`)
) engine=innodb default charset=utf8;
RocketMQ 建立 Topic
$ sh mqadmin updateTopic -n IP:Port -c TestCluster -t OrderQueue
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
create topic to 10.20.0.13:10911 success.
TopicConfig [topicName=OrderQueue, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
application.yml
# port
server:
port: 8080
# log
log:
root:
level: INFO
# mysql
datasource:
order:
url: jdbc:mysql://192.168.191.65:3306/orderdb?useUnicode=true&characterEncoding=utf8
username: root
password: [email protected]
driver-class-name: com.mysql.jdbc.Driver
max-wait: 10000
min-idle: 2
type: com.alibaba.druid.pool.DruidDataSource
maxActive: 10
initial-size: 5
validation-query: SELECT 1
test-on-borrow: false
test-while-idle: true
time-between-eviction-runs-millis: 18800
filters: stat
# rocketmq
rmq:
order-queue:
nameAddress: test-rq01-a.mq.01zhuanche.com:9876
groupName: order_queue_group
instanceName: order_queue_instance
batchMaxSize: 16
topicName: OrderQueue
tag:
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="false" scanPeriod="1 minutes">
<springProperty scope="context" name="log.root.level" source="log.root.level"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender" >
<encoder>
<pattern>%d{yyyy-MM-dd.HH:mm:ss.SSS} %t %p %c{0} %X{traceId} : %m%n</pattern>
</encoder>
</appender>
<logger name="com.harvey" level="INFO" />
<root level="${log.root.level}" >
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
banner.txt
_____ _ ____ _ _____ _ _ __ __ ____
/ ____| (_) | _ \ | | | __ \ | | | | | \/ |/ __ \
| (___ _ __ _ __ _ _ __ __ _| |_) | ___ ___ | |_ ______| |__) |___ ___| | _____| |_| \ / | | | |
\___ \| '_ \| '__| | '_ \ / _` | _ < / _ \ / _ \| __|______| _ // _ \ / __| |/ / _ \ __| |\/| | | | |
____) | |_) | | | | | | | (_| | |_) | (_) | (_) | |_ | | \ \ (_) | (__| < __/ |_| | | | |__| |
|_____/| .__/|_| |_|_| |_|\__, |____/ \___/ \___/ \__| |_| \_\___/ \___|_|\_\___|\__|_| |_|\___\_\
| | __/ |
|_| |___/
2.程式碼
1.資料來源
- BaseDataSourceConfig.java
package com.harvey.datasource;
import com.github.pagehelper.PageHelper;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
import java.util.Properties;
/**
* @description 資料來源配置基類
* @auther harvey
*/
public interface BaseDataSourceConfig {
/**
* 獲取mapper.xml路徑
* @return
*/
String getMapperLocations();
/**
* 基於PageHelper支援的SqlSessionFactory
* @param dataSource
* @param dialect
* @return
* @throws Exception
*/
default SqlSessionFactory createPageSessionFactory(DataSource dataSource, String dialect) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(this.getMapperLocations()));
PageHelper pageHelper = new PageHelper();
Properties props = new Properties();
props.setProperty("dialect", dialect);
pageHelper.setProperties(props); // 新增外掛
factoryBean.setPlugins(new Interceptor[]{pageHelper});
return factoryBean.getObject();
}
}
- OrderDataSourceConfig.java
package com.harvey.datasource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @description 資料來源配置類,多個數據源參照此類配置
* @auther harvey
*/
@Configuration
@MapperScan(basePackages = {"com.harvey.dao.order"}, sqlSessionTemplateRef = "orderSqlSessionTemplate")
public class OrderDataSourceConfig implements BaseDataSourceConfig{
static final String MAPPER_LOCATION = "classpath*:mapper/order/*.xml";
@Bean(name="orderProperties")
@ConfigurationProperties(prefix = "datasource.order")
@Primary
public DataSourceProperties dataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "orderDS")
@ConfigurationProperties(prefix = "datasource.order")
@Primary
public DataSource dataSource(@Qualifier("orderProperties")DataSourceProperties dataSourceProperties) {
return dataSourceProperties.initializeDataSourceBuilder().build();
}
@Bean(name ="orderSqlSessionFactory")
@Primary
public SqlSessionFactory orderSqlSessionFactory(@Qualifier("orderDS") DataSource dataSource) throws Exception {
return this.createPageSessionFactory(dataSource,"mysql");
}
@Bean(name = "orderTransactionManager")
@Primary
public DataSourceTransactionManager orderTransactionManager(@Qualifier("orderDS") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name="orderSqlSessionTemplate")
@Primary
public SqlSessionTemplate orderSqlSessionTemplate(@Qualifier("orderSqlSessionFactory") SqlSessionFactory factory) {
return new SqlSessionTemplate(factory);
}
@Override
public String getMapperLocations() {
return MAPPER_LOCATION;
}
}
2.RocketMQ 配置
com.harvey.mq.consumer.common 包下
- BaseConsumer.java
package com.harvey.mq.consumer.common;
import com.harvey.common.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/**
* @description RocketMQ 消費基類 BaseConsumer
* @auther harvey
*/
@Slf4j
@Configuration
public class BaseConsumer implements DisposableBean {
private String nameAddress;
private String groupName;
private int batchMaxSize;
private String topicName;
private String instanceName;
private String tag;
private MQMessageListener mqMessageListener;
private DefaultMQPushConsumer consumer = null;
public BaseConsumer() { }
public BaseConsumer(String nameAddress, String groupName, int batchMaxSize, String topicName, String instanceName, String tag) {
this.nameAddress = nameAddress;
this.groupName = groupName;
this.batchMaxSize = batchMaxSize;
this.topicName = topicName;
this.instanceName = instanceName;
this.tag = tag;
}
public void registerListener() {
try {
log.info("rocketmq init【topicName = " + this.topicName + ",groupName = " + this.groupName + "】");
if (consumer == null) {
consumer = new DefaultMQPushConsumer(this.groupName);
}
consumer.setNamesrvAddr(this.nameAddress);
// consumer 消費策略:從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(Constants.CONSUMER_FROM_TIME);
consumer.setConsumeMessageBatchMaxSize(this.batchMaxSize);
if (StringUtils.isNotEmpty(this.tag)) {
consumer.subscribe(this.topicName, this.tag);
} else {
consumer.subscribe(this.topicName, "*");
}
// 註冊監聽
consumer.registerMessageListener(new MessageListenerOrderly() {
String errmsg = "";
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (int i = 0; i < msgs.size(); i++) {
MessageExt msgExt = msgs.get(i);
String msgId = msgExt.getMsgId();
String msgBody = new String(msgExt.getBody());
try {
mqMessageListener.handleMessage(msgBody, msgId);
} catch (Exception e) {
String errmsg = String.format("consumer error: topic = %s, groupName = %s, data:%s, exception:%s", topicName, groupName, msgBody, e.getMessage());
log.error(errmsg, e);
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
String errmsg = "rocketmq registerMessageListener fail!【topicName = " + this.topicName + ",groupName = " + this.groupName + "】";
log.error(errmsg, e);
}
log.info("rocketmq registerMessageListener success!【topicName = " + this.topicName + ",groupName = " + this.groupName + "】");
}
@Override
public void destroy() throws Exception {
consumer.shutdown();
log.info("consumer shutdown...");
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNameAddress() {
return nameAddress;
}
public void setNameAddress(String nameAddress) {
this.nameAddress = nameAddress;
}
public int getBatchMaxSize() {
return batchMaxSize;
}
public void setBatchMaxSize(int batchMaxSize) {
this.batchMaxSize = batchMaxSize;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public MQMessageListener getMqMessageListener() {
return mqMessageListener;
}
public void setMqMessageListener(MQMessageListener mqMessageListener) {
this.mqMessageListener = mqMessageListener;
}
}
- MQMessageListener.java
package com.harvey.mq.consumer.common;
/**
* @description 資料收集MQ 監聽
* @auther huhanwei
*/
@FunctionalInterface
public interface MQMessageListener {
public void handleMessage(String message, String msgId) throws Exception;
}
- MQConfig.java
package com.harvey.mq.consumer.common;
import com.harvey.mq.consumer.OrderConsumer;
import com.harvey.mq.consumer.conf.OrderConf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description 資料收集MQ 啟動配置類
* @auther harvey
*/
@Configuration // 註解作用:標明為一個配置類並交由Spring管理
public class MQConfig {
@Autowired
private OrderConf orderConf;
// OrderQueue
@Bean
public BaseConsumer startOrderConsumer(@Qualifier("orderConsumer")OrderConsumer orderConsumer) {
BaseConsumer consumer = new BaseConsumer(orderConf.getNameAddress(),
orderConf.getGroupName(),
orderConf.getBatchMaxSize(),
orderConf.getTopicName(),
orderConf.getInstanceName(),
orderConf.getTag());
consumer.setMqMessageListener(orderConsumer);
new Thread(consumer::registerListener).start();
return consumer;
}
}
com.harvey.mq.consumer.conf包下
- OrderConf.java
package com.harvey.mq.consumer.conf;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @description MQ OrderQueue 初始化配置類
* @auther harvey
*/
@Component
@ConfigurationProperties(prefix = "rmq.order-queue")
// 使用 @Getter 和 @Setter 就不用對屬性提供get、set方法,找度娘安裝個lombok外掛
@Getter
@Setter
public class OrderConf {
private String nameAddress;
private String groupName;
private int batchMaxSize;
private String topicName;
private String instanceName;
private String tag;
}
3.實體類
package com.harvey.model;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class Order {
private String orderNo; // 訂單號
private String userId; // 使用者id
private Double orderMoney; // 訂單金額
private int payType; // 0:貨到付款,1:線上支付
private String createTime; // 訂單建立時間
}
4.DAO
- OrderDao.java
package com.harvey.dao.order;
import com.harvey.model.Order;
public interface OrderDao {
void saveOrder(Order order);
}
- OrderDao.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.harvey.dao.order.OrderDao" >
<insert id="saveOrder" parameterType="com.harvey.model.Order">
INSERT INTO
order_test(order_no, user_id, order_money, pay_type, create_time)
VALUES
(#{orderNo}, #{userId}, #{orderMoney}, #{payType}, #{createTime})
</insert>
</mapper>
5.Service
- IOrderService.java
package com.harvey.service.order;
import com.harvey.model.Order;
public interface IOrderService {
void saveOrder(Order order);
}
- OrderServiceImpl.java
package com.harvey.service.order.impl;
import com.harvey.dao.order.OrderDao;
import com.harvey.model.Order;
import com.harvey.service.order.IOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderServiceImpl implements IOrderService {
@Autowired
private OrderDao orderDao;
@Override
public void saveOrder(Order order) {
orderDao.saveOrder(order);
}
}
6.消費者
package com.harvey.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.harvey.common.Constants;
import com.harvey.model.Order;
import com.harvey.mq.consumer.common.MQMessageListener;
import com.harvey.mq.consumer.conf.OrderConf;
import com.harvey.service.order.IOrderService;
import com.harvey.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @description 訂單消費者
* @auther harvey
*/
@Component("orderConsumer")
@Slf4j
public class OrderConsumer implements MQMessageListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private IOrderService orderService;
@Autowired
private OrderConf orderConf;
@Override
public void handleMessage(String message, String msgId) throws Exception {
try {
if (StringUtils.isNotEmpty(message)) {
log.info("資料收集-消費者【topicName = " + orderConf.getTopicName() + ",data = " + message + "】");
// 業務處理...
Order order = objectMapper.readValue(message, Order.class);
orderService.saveOrder(order);
} else {
log.info("訊息為空不做處理!");
}
} catch (Exception e) {
log.error("RocketMQ 消費資料->入庫 異常!【topicName = " + orderConf.getTopicName() + "】", e);
}
}
}
7.生產者
package com.harvey.mq.producer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.harvey.model.Order;
import com.harvey.mq.consumer.conf.OrderConf;
import com.harvey.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
* @description 訂單生產者
* @auther harvey
*/
@Slf4j
public class OrderProducer {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static String nameAddress = "test-rq01-a.mq.01zhuanche.com:9876";
private static String groupName = "order_queue_group";
private static String topicName = "OrderQueue";
private static String instanceName = "order_queue_instance";
private static String getMessageContent(int i) throws Exception {
Order order = new Order();
order.setOrderNo(String.valueOf(i + 123456789));
order.setUserId(String.valueOf(i + 1001));
order.setOrderMoney(Double.parseDouble(String.valueOf(100 + i)));
order.setPayType(i % 2 == 0 ? 0 : 1);
order.setCreateTime(DateUtil.getCurrentTime());
String jsonStr = objectMapper.writeValueAsString(order);
return jsonStr;
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(nameAddress);
producer.setInstanceName(instanceName);
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message(topicName,
"tag" + String.valueOf(i),
"key" + String.valueOf(i),
getMessageContent(i).getBytes());
SendResult sendResult = producer.send(msg);
log.info(sendResult.toString());
TimeUnit.MILLISECONDS.sleep(1000);
}
producer.shutdown();
}
}
3.測試
- 1.啟動程式
程式啟動完成後,會列印如下兩行日誌,如果看到
rocketmq registerMessageListener success! 說明啟動成功,rocketmq會監聽topic,如果有訊息,會主動拉取訊息消費資料
rocketmq init【topicName = OrderQueue,groupName = order_queue_group】
rocketmq registerMessageListener success!【topicName = OrderQueue,groupName = order_queue_group】
- 啟動生產者
檢視控制檯(部分日誌)
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560B986B005A, offsetMsgId=0A14000D00002A9F0000000F7F74DABA, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=0], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560B9C5C005B, offsetMsgId=0A14000D00002A9F0000000F7F752C21, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=1], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA050005C, offsetMsgId=0A14000D00002A9F0000000F7F753FC5, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=2], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA443005D, offsetMsgId=0A14000D00002A9F0000000F7F7559B8, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=3], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA838005E, offsetMsgId=0A14000D00002A9F0000000F7F756FAC, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=4], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BAC2D005F, offsetMsgId=0A14000D00002A9F0000000F7F757E79, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=5], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB0220060, offsetMsgId=0A14000D00002A9F0000000F7F759DF6, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=6], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB4230061, offsetMsgId=0A14000D00002A9F0000000F7F75A29B, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=7], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB81C0062, offsetMsgId=0A14000D00002A9F0000000F7F75BA28, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=0], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BBC210063, offsetMsgId=0A14000D00002A9F0000000F7F75C482, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=1], queueOffset=12]
- 觀察消費者控制檯
部分日誌
INFO OrderConsumer : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456878","userId":"1090","orderMoney":189.0,"payType":1,"createTime":"2018-08-17 16:59:59"}】
INFO OrderConsumer : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456879","userId":"1091","orderMoney":190.0,"payType":0,"createTime":"2018-08-17 17:00:00"}】
INFO OrderConsumer : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456880","userId":"1092","orderMoney":191.0,"payType":1,"createTime":"2018-08-17 17:00:01"}】
INFO OrderConsumer : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456881","userId":"1093","orderMoney":192.0,"payType":0,"createTime":"2018-08-17 17:00:02"}】
INFO OrderConsumer : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456882","userId":"1094",