1. 程式人生 > >SpringBoot 整合 RocketMQ 消費資料存庫(MySQL)

SpringBoot 整合 RocketMQ 消費資料存庫(MySQL)

廢話不多了,直接上程式碼

1.專案結構及環境配置

專案結構

src
    main
        java
            com
                harve
                    common --------------- 公共類
                    dao ------------------ DAO
                    datasource ----------- 資料來源
                    model ---------------- 實體類
                    mq
                        consumer --------- rocketmq 消費者
producer --------- rocketmq 生產者 service -------------- 業務處理 util ----------------- 工具類 resource mapper ----------------------- mybatis 配置檔案 application.yml -------------- 全域性配置檔案 banner.txt ------------------- logo
logback-spring.xml ----------- 日誌配置

MySQL 建立表 SQL 指令碼

drop table if exists `order_test`;
create table `order_test` (
  `id` int(11) not null auto_increment,
  `order_no` varchar(20) default null,
  `user_id` varchar(20) default null,
  `order_money` double default null,
  `pay_type` int
(11) default null, `create_time` varchar(20) default null, primary key (`id`) ) engine=innodb default charset=utf8;

RocketMQ 建立 Topic

$ sh mqadmin updateTopic -n IP:Port -c TestCluster -t OrderQueue
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
create topic to 10.20.0.13:10911 success.
TopicConfig [topicName=OrderQueue, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

application.yml

# port
server:
  port: 8080

# log
log:
  root:
    level: INFO

# mysql
datasource:
  order:
    url: jdbc:mysql://192.168.191.65:3306/orderdb?useUnicode=true&characterEncoding=utf8
    username: root
    password: [email protected]
    driver-class-name: com.mysql.jdbc.Driver
    max-wait: 10000
    min-idle: 2
    type: com.alibaba.druid.pool.DruidDataSource
    maxActive: 10
    initial-size: 5
    validation-query: SELECT 1
    test-on-borrow: false
    test-while-idle: true
    time-between-eviction-runs-millis: 18800
    filters: stat

# rocketmq
rmq:
  order-queue:
    nameAddress: test-rq01-a.mq.01zhuanche.com:9876
    groupName: order_queue_group
    instanceName: order_queue_instance
    batchMaxSize: 16
    topicName: OrderQueue
    tag:

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>

<configuration scan="false" scanPeriod="1 minutes">
    <springProperty scope="context" name="log.root.level" source="log.root.level"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender" >
        <encoder>
           <pattern>%d{yyyy-MM-dd.HH:mm:ss.SSS} %t %p %c{0} %X{traceId} : %m%n</pattern>
        </encoder>
    </appender>

    <logger name="com.harvey" level="INFO" />

    <root level="${log.root.level}" >
         <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

banner.txt

  _____            _             ____              _          _____            _        _   __  __  ____
  / ____|          (_)           |  _ \            | |        |  __ \          | |      | | |  \/  |/ __ \
 | (___  _ __  _ __ _ _ __   __ _| |_) | ___   ___ | |_ ______| |__) |___   ___| | _____| |_| \  / | |  | |
  \___ \| '_ \| '__| | '_ \ / _` |  _ < / _ \ / _ \| __|______|  _  // _ \ / __| |/ / _ \ __| |\/| | |  | |
  ____) | |_) | |  | | | | | (_| | |_) | (_) | (_) | |_       | | \ \ (_) | (__|   <  __/ |_| |  | | |__| |
 |_____/| .__/|_|  |_|_| |_|\__, |____/ \___/ \___/ \__|      |_|  \_\___/ \___|_|\_\___|\__|_|  |_|\___\_\
        | |                  __/ |
        |_|                 |___/

2.程式碼

1.資料來源
  • BaseDataSourceConfig.java
package com.harvey.datasource;

import com.github.pagehelper.PageHelper;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;
import java.util.Properties;

/**
 * @description 資料來源配置基類
 * @auther harvey
 */
public interface BaseDataSourceConfig {
    /**
     * 獲取mapper.xml路徑
     * @return
     */
    String getMapperLocations();

    /**
     * 基於PageHelper支援的SqlSessionFactory
     * @param dataSource
     * @param dialect
     * @return
     * @throws Exception
     */
    default SqlSessionFactory createPageSessionFactory(DataSource dataSource, String dialect) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(this.getMapperLocations()));
        PageHelper pageHelper = new PageHelper();
        Properties props = new Properties();
        props.setProperty("dialect", dialect);
        pageHelper.setProperties(props); // 新增外掛
        factoryBean.setPlugins(new Interceptor[]{pageHelper});
        return factoryBean.getObject();
    }
}
  • OrderDataSourceConfig.java
package com.harvey.datasource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;

/**
 * @description 資料來源配置類,多個數據源參照此類配置
 * @auther harvey
 */
@Configuration
@MapperScan(basePackages = {"com.harvey.dao.order"}, sqlSessionTemplateRef = "orderSqlSessionTemplate")
public class OrderDataSourceConfig implements BaseDataSourceConfig{

    static final String MAPPER_LOCATION = "classpath*:mapper/order/*.xml";

    @Bean(name="orderProperties")
    @ConfigurationProperties(prefix = "datasource.order")
    @Primary
    public DataSourceProperties dataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean(name = "orderDS")
    @ConfigurationProperties(prefix = "datasource.order")
    @Primary
    public DataSource dataSource(@Qualifier("orderProperties")DataSourceProperties dataSourceProperties) {
        return dataSourceProperties.initializeDataSourceBuilder().build();
    }

    @Bean(name ="orderSqlSessionFactory")
    @Primary
    public SqlSessionFactory orderSqlSessionFactory(@Qualifier("orderDS") DataSource dataSource) throws Exception {
        return this.createPageSessionFactory(dataSource,"mysql");
    }

    @Bean(name = "orderTransactionManager")
    @Primary
    public DataSourceTransactionManager orderTransactionManager(@Qualifier("orderDS") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name="orderSqlSessionTemplate")
    @Primary
    public SqlSessionTemplate orderSqlSessionTemplate(@Qualifier("orderSqlSessionFactory") SqlSessionFactory factory)  {
        return new SqlSessionTemplate(factory);
    }

    @Override
    public String getMapperLocations() {
        return MAPPER_LOCATION;
    }
}
2.RocketMQ 配置

com.harvey.mq.consumer.common 包下

  • BaseConsumer.java
package com.harvey.mq.consumer.common;

import com.harvey.common.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/**
 * @description RocketMQ 消費基類 BaseConsumer
 * @auther harvey
 */
@Slf4j
@Configuration
public class BaseConsumer implements DisposableBean {

    private String nameAddress;

    private String groupName;

    private int batchMaxSize;

    private String topicName;

    private String instanceName;

    private String tag;

    private MQMessageListener mqMessageListener;

    private DefaultMQPushConsumer consumer = null;

    public BaseConsumer() { }

    public BaseConsumer(String nameAddress, String groupName, int batchMaxSize, String topicName, String instanceName, String tag) {
        this.nameAddress = nameAddress;
        this.groupName = groupName;
        this.batchMaxSize = batchMaxSize;
        this.topicName = topicName;
        this.instanceName = instanceName;
        this.tag = tag;
    }

    public void registerListener() {
        try {
            log.info("rocketmq init【topicName = " + this.topicName + ",groupName = " + this.groupName + "】");
            if (consumer == null) {
                consumer = new DefaultMQPushConsumer(this.groupName);
            }
            consumer.setNamesrvAddr(this.nameAddress);
            // consumer 消費策略:從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.setConsumeTimestamp(Constants.CONSUMER_FROM_TIME);
            consumer.setConsumeMessageBatchMaxSize(this.batchMaxSize);
            if (StringUtils.isNotEmpty(this.tag)) {
                consumer.subscribe(this.topicName, this.tag);
            } else {
                consumer.subscribe(this.topicName, "*");
            }

            // 註冊監聽
            consumer.registerMessageListener(new MessageListenerOrderly() {
                String errmsg = "";
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);

                    for (int i = 0; i < msgs.size(); i++) {
                        MessageExt msgExt = msgs.get(i);
                        String msgId = msgExt.getMsgId();
                        String msgBody = new String(msgExt.getBody());
                        try {
                            mqMessageListener.handleMessage(msgBody, msgId);
                        } catch (Exception e) {
                            String errmsg = String.format("consumer error: topic = %s, groupName = %s, data:%s, exception:%s", topicName, groupName, msgBody, e.getMessage());
                            log.error(errmsg, e);
                        }
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            String errmsg = "rocketmq registerMessageListener fail!【topicName = " + this.topicName + ",groupName = " + this.groupName + "】";
            log.error(errmsg, e);
        }
        log.info("rocketmq registerMessageListener success!【topicName = " + this.topicName + ",groupName = " + this.groupName + "】");
    }

    @Override
    public void destroy() throws Exception {
        consumer.shutdown();
        log.info("consumer shutdown...");
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getNameAddress() {
        return nameAddress;
    }

    public void setNameAddress(String nameAddress) {
        this.nameAddress = nameAddress;
    }

    public int getBatchMaxSize() {
        return batchMaxSize;
    }

    public void setBatchMaxSize(int batchMaxSize) {
        this.batchMaxSize = batchMaxSize;
    }

    public String getInstanceName() {
        return instanceName;
    }

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public MQMessageListener getMqMessageListener() {
        return mqMessageListener;
    }

    public void setMqMessageListener(MQMessageListener mqMessageListener) {
        this.mqMessageListener = mqMessageListener;
    }
}
  • MQMessageListener.java
package com.harvey.mq.consumer.common;

/**
 * @description 資料收集MQ 監聽
 * @auther huhanwei
 */
@FunctionalInterface
public interface MQMessageListener {

    public void handleMessage(String message, String msgId) throws Exception;

}
  • MQConfig.java
package com.harvey.mq.consumer.common;

import com.harvey.mq.consumer.OrderConsumer;
import com.harvey.mq.consumer.conf.OrderConf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description 資料收集MQ 啟動配置類
 * @auther harvey
 */
@Configuration // 註解作用:標明為一個配置類並交由Spring管理
public class MQConfig {

    @Autowired
    private OrderConf orderConf;

    // OrderQueue
    @Bean
    public BaseConsumer startOrderConsumer(@Qualifier("orderConsumer")OrderConsumer orderConsumer) {
        BaseConsumer consumer = new BaseConsumer(orderConf.getNameAddress(),
                orderConf.getGroupName(),
                orderConf.getBatchMaxSize(),
                orderConf.getTopicName(),
                orderConf.getInstanceName(),
                orderConf.getTag());
        consumer.setMqMessageListener(orderConsumer);
        new Thread(consumer::registerListener).start();
        return consumer;
    }
}

com.harvey.mq.consumer.conf包下

  • OrderConf.java
package com.harvey.mq.consumer.conf;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @description MQ OrderQueue 初始化配置類
 * @auther harvey
 */
@Component
@ConfigurationProperties(prefix = "rmq.order-queue")
// 使用 @Getter 和 @Setter 就不用對屬性提供get、set方法,找度娘安裝個lombok外掛
@Getter
@Setter
public class OrderConf {

    private String nameAddress;

    private String groupName;

    private int batchMaxSize;

    private String topicName;

    private String instanceName;

    private String tag;
}
3.實體類
package com.harvey.model;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class Order {

    private String orderNo; // 訂單號
    private String userId; // 使用者id
    private Double orderMoney; // 訂單金額
    private int payType; // 0:貨到付款,1:線上支付
    private String createTime; // 訂單建立時間
}
4.DAO
  • OrderDao.java
package com.harvey.dao.order;

import com.harvey.model.Order;

public interface OrderDao {

    void saveOrder(Order order);
}
  • OrderDao.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.harvey.dao.order.OrderDao" >
    <insert id="saveOrder" parameterType="com.harvey.model.Order">
        INSERT INTO
          order_test(order_no, user_id, order_money, pay_type, create_time)
        VALUES
          (#{orderNo}, #{userId}, #{orderMoney}, #{payType}, #{createTime})
    </insert>
</mapper>
5.Service
  • IOrderService.java
package com.harvey.service.order;

import com.harvey.model.Order;

public interface IOrderService {

    void saveOrder(Order order);
}
  • OrderServiceImpl.java
package com.harvey.service.order.impl;

import com.harvey.dao.order.OrderDao;
import com.harvey.model.Order;
import com.harvey.service.order.IOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl implements IOrderService {

    @Autowired
    private OrderDao orderDao;

    @Override
    public void saveOrder(Order order) {
        orderDao.saveOrder(order);
    }
}
6.消費者
package com.harvey.mq.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.harvey.common.Constants;
import com.harvey.model.Order;
import com.harvey.mq.consumer.common.MQMessageListener;
import com.harvey.mq.consumer.conf.OrderConf;
import com.harvey.service.order.IOrderService;
import com.harvey.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @description 訂單消費者
 * @auther harvey
 */
@Component("orderConsumer")
@Slf4j
public class OrderConsumer implements MQMessageListener {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Autowired
    private IOrderService orderService;

    @Autowired
    private OrderConf orderConf;

    @Override
    public void handleMessage(String message, String msgId) throws Exception {
        try {
            if (StringUtils.isNotEmpty(message)) {
                log.info("資料收集-消費者【topicName = " + orderConf.getTopicName() + ",data = " + message + "】");
                // 業務處理...
                Order order = objectMapper.readValue(message, Order.class);
                orderService.saveOrder(order);
            } else {
                log.info("訊息為空不做處理!");
            }
        } catch (Exception e) {
            log.error("RocketMQ 消費資料->入庫 異常!【topicName = " + orderConf.getTopicName() + "】", e);
        }
    }
}
7.生產者
package com.harvey.mq.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.harvey.model.Order;
import com.harvey.mq.consumer.conf.OrderConf;
import com.harvey.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

/**
 * @description 訂單生產者
 * @auther harvey
 */
@Slf4j
public class OrderProducer {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    private static String nameAddress = "test-rq01-a.mq.01zhuanche.com:9876";

    private static String groupName = "order_queue_group";

    private static String topicName = "OrderQueue";

    private static String instanceName = "order_queue_instance";

    private static String getMessageContent(int i) throws Exception {
        Order order = new Order();
        order.setOrderNo(String.valueOf(i + 123456789));
        order.setUserId(String.valueOf(i + 1001));
        order.setOrderMoney(Double.parseDouble(String.valueOf(100 + i)));
        order.setPayType(i % 2 == 0 ? 0 : 1);
        order.setCreateTime(DateUtil.getCurrentTime());
        String jsonStr = objectMapper.writeValueAsString(order);
        return jsonStr;
    }

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameAddress);
        producer.setInstanceName(instanceName);

        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message(topicName,
                    "tag" + String.valueOf(i),
                    "key" + String.valueOf(i),
                    getMessageContent(i).getBytes());
            SendResult sendResult = producer.send(msg);
            log.info(sendResult.toString());
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        producer.shutdown();
    }
}

3.測試

  • 1.啟動程式
    程式啟動完成後,會列印如下兩行日誌,如果看到
    rocketmq registerMessageListener success! 說明啟動成功,rocketmq會監聽topic,如果有訊息,會主動拉取訊息消費資料
rocketmq init【topicName = OrderQueue,groupName = order_queue_group】
rocketmq registerMessageListener success!【topicName = OrderQueue,groupName = order_queue_group】
  • 啟動生產者
    檢視控制檯(部分日誌)
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560B986B005A, offsetMsgId=0A14000D00002A9F0000000F7F74DABA, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=0], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560B9C5C005B, offsetMsgId=0A14000D00002A9F0000000F7F752C21, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=1], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA050005C, offsetMsgId=0A14000D00002A9F0000000F7F753FC5, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=2], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA443005D, offsetMsgId=0A14000D00002A9F0000000F7F7559B8, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=3], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BA838005E, offsetMsgId=0A14000D00002A9F0000000F7F756FAC, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=4], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BAC2D005F, offsetMsgId=0A14000D00002A9F0000000F7F757E79, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=5], queueOffset=11]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB0220060, offsetMsgId=0A14000D00002A9F0000000F7F759DF6, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=6], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB4230061, offsetMsgId=0A14000D00002A9F0000000F7F75A29B, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=7], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BB81C0062, offsetMsgId=0A14000D00002A9F0000000F7F75BA28, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=0], queueOffset=12]
INFO com.harvey.mq.producer.OrderProducer - SendResult [sendStatus=SEND_OK, msgId=AC10141731A465AB7765560BBC210063, offsetMsgId=0A14000D00002A9F0000000F7F75C482, messageQueue=MessageQueue [topic=OrderQueue, brokerName=broker-a, queueId=1], queueOffset=12]
  • 觀察消費者控制檯
    部分日誌
INFO OrderConsumer  : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456878","userId":"1090","orderMoney":189.0,"payType":1,"createTime":"2018-08-17 16:59:59"}】
INFO OrderConsumer  : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456879","userId":"1091","orderMoney":190.0,"payType":0,"createTime":"2018-08-17 17:00:00"}】
INFO OrderConsumer  : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456880","userId":"1092","orderMoney":191.0,"payType":1,"createTime":"2018-08-17 17:00:01"}】
INFO OrderConsumer  : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456881","userId":"1093","orderMoney":192.0,"payType":0,"createTime":"2018-08-17 17:00:02"}】
INFO OrderConsumer  : 資料收集-消費者【topicName = OrderQueue,data = {"orderNo":"123456882","userId":"1094",