1. 程式人生 > 其它 >實時分析系統--交易額需求

實時分析系統--交易額需求

1章 採集資料

1.1 框架流程

1.2 Canal 入門

1.2.1 什麼是 Canal

  由於Canal沒有官網,所以可以認為它託管在github上的專案就是官網,所以地址是:https://github.com/alibaba/canal

1.2.2 使用場景

  1)原始場景: 阿里Otter中介軟體的一部分,Otter是阿里用於進行異地資料庫之間的同步框架,Canal是其中一部分。

  2 常見場景1:更新快取

  3)場景2:抓取業務資料新增變化表,用於製作拉鍊表。

  4)場景3:抓取業務表的新增變化資料,用於製作實時統計。

1.2.3Canal的工作原理

  複製過程分成三步:

    1Master主庫將改變記錄寫到二進位制日誌(binary log)中;

    2Slave從庫向mysql master傳送dump協議,將master主庫的binary log events拷貝到它的中繼日誌(relay log)

    3Slave從庫讀取並重做中繼日誌中的事件,將改變的資料同步到自己的資料庫。

  Canal的工作原理很簡單,就是把自己偽裝成Slave,假裝從Master複製資料:

    1)canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 傳送dump 協議

    2)MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )

    3)canal 解析 binary log 物件(原始為 byte 流)

1.2.4 MySQLBinlog

1.2.4.1 什麼是Binlog

  MySQL的二進位制日誌可以說是MySQL最重要的日誌了,它記錄了所有的DDL和DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進位制日誌是事務安全型的。

  一般來說開啟二進位制日誌大概會有1%的效能損耗。二進位制有兩個最重要的使用場景:

    1)MySQL Replication在Master端開啟binlog,Mster把它的二進位制日誌傳遞給slaves來達到master-slave資料一致的目的。

    2)自然就是資料恢復了,通過使用MySQLBinlog工具來使恢復資料。

  二進位制日誌包括兩類檔案:

    1)二進位制日誌索引檔案(檔名字尾為.index):用於記錄所有的二進位制檔案

    2)二進位制日誌檔案(檔名字尾為.00000*):記錄資料庫所有的DDL和DML(除了資料查詢語句)語句事件。

1.2.4.2 Binlog的開啟

  在MySQL的配置檔案(Linux: /etc/my.cnf , Windows: \my.ini)下,修改配置在[mysqld] 區塊設定/新增

#Binlog日誌的開啟
log_bin=mysql-bin

  這個表示binlog日誌的字首是mysql-bin,以後生成的日誌檔案就是mysql-bin.123456 的檔案後面的數字按順序生成。每次mysql重啟或者到達單個檔案大小的閾值時,新生一個檔案,按順序編號

1.2.4.3 Binlog的分類設定

  MySQLBinlog的格式,那就是有三種,分別是statement、mixed、row

  在配置檔案中選擇配置binlog_format屬性

#選擇Binlog的格式
binlog_format=row
#保證server-id是唯一的
server-id=1

  區別:

    1)statement

      語句級,binlog會記錄每次一執行寫操作的語句,相對row模式節省空間,但是可能產生不一致性,比如:update tt set create_date=now(),如果用binlog日誌進行恢復,由於執行時間不同可能產生的資料就不同。

      優點:節省空間

      缺點:有可能造成資料不一致。

    2)row

      行級,binlog會記錄每次操作後每行記錄的變化。

      優點:保持資料的絕對一致性。因為不管sql是什麼,引用了什麼函式,他只記錄執行後的效果。

      缺點:佔用較大空間。

    3)mixed

      statement的升級版,一定程度上解決了,因為一些情況而造成的statement模式不一致問題,在某些情況下譬如:當函式中包含 UUID() 時,包含 AUTO_INCREMENT 欄位的表被更新時,執行 INSERT DELAYED 語句時, UDF 時,會按照 ROW的方式進行處理

      優點:節省空間,同時兼顧了一定的一致性。

       缺點:還有些極個別情況依舊會造成不一致,

  另外statementmixed對於需要對binlog的監控的情況都不方便

1.3MySQL的準備

1.3.1 匯入模擬業務資料庫

1.3.2 賦許可權

  1)進入mysql客戶端

mysql -uroot -proot123

  2)更改mysql密碼策略

set global validate_password_length=4;
set global validate_password_policy=0;

  3)在mysql中執行如下語句,建立canal使用者,密碼為canal:

grant select, replication slave, replication client on *.* to 'canal'@'%' identified by 'canal';

  4)建立gmall_realtime資料庫

  5)執行gmall.sql檔案(下載連結是下方的網盤地址,請自行下載),執行過程中若出現如下錯誤,則開啟該sql檔案,修改庫名為gmall_realtime

  6)儲存過程,模擬資料(執行sql指令碼時已經建立,直接使用即可)

#CALL `init_data`(造資料的日期, 生成的訂單數, 生成的使用者數, 是否覆蓋寫)
call `init_data`('2021-07-07',17,5,false)

1.3.3 修改/etc/my.cnf檔案

sudo vim /etc/my.cnf
#Binlog日誌的開啟
log_bin=mysql-bin
#選擇Binlog的格式
binlog_format=row
#保證server-id是唯一的
server-id=1
#只記錄哪些庫的寫操作
binlog-do-db=gmall_realtime

1.3.4 重啟MySql並檢視狀態

sudo systemctl restart mysqld
sudo systemctl status mysqld

1.4Canal 安裝

1.4.1 Canal的下載(我們下載1.1.2版本的即可)

  1)偽官網下載地址:https://github.com/alibaba/canal/releases,找到1.1.2版本,下載它

  2)我已經下載下來了,網盤連結下載地址:https://pan.baidu.com/s/1p6vh6FOe-0wd4U8tyuKslQ  提取碼:fzbq

  3)下載完成後將其上傳至hadoop104機器中的/opt/software/

  4)在/opt/module/目錄下建立canal目錄

mkdir /opt/module/canal

  5)解壓

tar -zxvf /opt/software/canal.deployer-1.1.2.tar.gz -C /opt/module/canal/

1.4.2 修改canal的配置

  1)修改canal.properties

vim /opt/module/canal/conf/canal.properties
#我的canal安裝在hadoop104
canal.ip = hadoop104
#這個檔案是canal的基本通用配置,主要關心一下埠號,不改的話預設就是11111
canal.port = 11111

  2)修改instance.properties(該檔案的作用主要是針對要追蹤的MySQL的例項配置)

vim /opt/module/canal/conf/example/instance.properties
#slaveId不能與mysql中的server-id重複
canal.instance.mysql.slaveId=2

#mysql的地址和埠號,我的mysql安裝在hadoop102,你的安裝在哪就寫哪
canal.instance.master.address=hadoop102:3306
#從binlog的哪個檔案的哪個位置開始同步 需要在主機上執行show master status檢視最新的位置,你執行的結果是啥你就寫啥,不要照抄我的
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=88546

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

1.4.3 啟動canal

/opt/module/canal/bin/startup.sh

1.4.4 停止canal

/opt/module/canal/bin/stop

1.5資料監控模組---抓取訂單資料

1.5.1 建立gmall_canalclient模組

1.5.2 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall_sparkstream</artifactId>
        <groupId>com.yuange</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gmall_canalclient</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.yuange</groupId>
            <artifactId>gmall_common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

</project>

1.5.3通用監視類

  1Canal封裝的資料結構

  2)建立生產者,MyProducer.java

package com.yuange.canal;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @作者:袁哥
 * @時間:2021/7/7 18:54
 */
public class MyProducer {

    private static Producer<String,String> producer;

    static {
        producer=getProducer();
    }

    // 提供方法返回生產者
    public static Producer<String,String> getProducer(){
        Properties properties = new Properties();
        //參考 ProducerConfig
        properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(properties);
    }

    //傳送資料至kafka
    public static void sendDataToKafka(String topic,String msg){
        producer.send(new ProducerRecord<String, String>(topic,msg));
    }
}

  3)監控Mysql,MyClient.java

package com.yuange.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yuange.constants.Constants;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/7 18:10
 * 步驟:
 *    ①建立一個客戶端:CanalConnector(SimpleCanalConnector:  單節點canal叢集、ClusterCanalConnector: HA canal叢集)
 *    ②使用客戶端連線 canal server
 *    ③指定客戶端訂閱 canal server中的binlog資訊
 *    ④解析binlog資訊
 *    ⑤寫入kafka
 *
 * 消費到的資料的結構:
 *      Message:  代表拉取的一批資料,這一批資料可能是多個SQL執行,造成的寫操作變化
 *          List<Entry> entries : 每個Entry代表一個SQL造成的寫操作變化
 *          id : -1 說明沒有拉取到資料
 *      Entry:
 *          CanalEntry.Header header_ :  頭資訊,其中包含了對這條sql的一些說明
 *              private Object tableName_: sql操作的表名
 *              EntryType; Entry操作的型別
 *                  開啟事務: 寫操作  begin
 *                  提交事務: 寫操作  commit
 *                  對原始資料進行影響的寫操作: rowdata
 *                          update、delete、insert
 *          ByteString storeValue_:   資料
 *              序列化資料,需要使用工具類RowChange,進行轉換,轉換之後獲取到一個RowChange物件
 */
public class MyClient {

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
        /**
         * 建立一個canal客戶端
         * public static CanalConnector newSingleConnector(
         *              SocketAddress address,  //指定canal server的主機名和埠號
         *              tring destination,      //參考/opt/module/canal/conf/canal.properties中的canal.destinations 屬性值
         *              String username,        //不是instance.properties中的canal.instance.dbUsername
         *              String password         //參考AdminGuide(從canal 1.1.4 之後才提供的),連結地址:https://github.com/alibaba/canal/wiki/AdminGuide
         * ) {...}
         * */
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("haddoop104", 11111),
                "example", "", "");

        //使用客戶端連線 canal server
        canalConnector.connect();

        //指定客戶端訂閱 canal server中的binlog資訊  只統計在Order_info表
        canalConnector.subscribe("gmall_realtime.order_info");

        //不停地拉取資料   Message[id=-1,entries=[],raw=false,rawEntries=[]] 代表當前這批沒有拉取到資料
        while (true){
            Message message = canalConnector.get(100);
            //判斷是否拉取到了資料,如果沒有拉取到,歇一會再去拉取
            if (message.getId() == -1){
                System.out.println("暫時沒有資料,先等會");
                Thread.sleep(5000);
                continue;
            }

            // 資料的處理邏輯
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                //判斷這個entry的型別是不是rowdata型別,只處理rowdata型別
                if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){
                    ByteString storeValue = entry.getStoreValue();          //資料
                    String tableName = entry.getHeader().getTableName();    //表名
                    handleStoreValue(storeValue,tableName);
                }
            }
        }
    }

    private static void handleStoreValue(ByteString storeValue, String tableName) throws InvalidProtocolBufferException {
        //將storeValue 轉化為 RowChange
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

        /**
         * 一個RowChange代表多行資料
         * order_info: 可能會執行的寫操作型別,統計GMV    total_amount
         *          insert :  會-->更新後的值
         *          update :  不會-->只允許修改 order_status
         *          delete :  不會,資料是不允許刪除
         * 判斷當前這批寫操作產生的資料是不是insert語句產生的
         * */
        if (rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
            //獲取行的集合
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList){
                JSONObject jsonObject = new JSONObject();
                //獲取insert後每一行的每一列
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                for (CanalEntry.Column column : afterColumnsList) {
                    jsonObject.put(column.getName(),column.getValue());
                }
                //傳送資料至Kafka,獲取列名和列值
                MyProducer.sendDataToKafka(Constants.GMALL_ORDER_INFO, jsonObject.toJSONString());
            }
        }
    }
}

  4)新增log4j.properties

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

1.5.4 測試

  1)啟動MyClient中的main方法

  2)使用儲存過程模擬業務資料

call `init_data`('2021-07-07',15,3,false)

  3)檢視Idea控制檯,發現已經成功同步了資料

  4)檢視kafka中的GMALL_ORDER_INFO 主題

2章 實時處理

2.1Phoenix建表

sqlline.py hadoop103:2181
create table gmall_order_info
(          id varchar primary key,
           province_id varchar,
           consignee varchar,
           order_comment varchar,
           consignee_tel varchar,
           order_status varchar,
           payment_way varchar,
           user_id varchar,
           img_url varchar,
           total_amount double,
           expire_time varchar,
           delivery_address varchar,
           create_time varchar,
           operate_time varchar,
           tracking_no varchar,
           parent_order_id varchar,
           out_trade_no varchar,
           trade_body varchar,
           create_date varchar,
           create_hour varchar);

2.2在gmall_realtime中新建樣例類,OrderInfo

package com.yuange.realtime.beans

/**
 * @作者:袁哥
 * @時間:2021/7/7 20:41
 */
case class OrderInfo(
                      id: String,
                      province_id: String,
                      consignee: String,
                      order_comment: String,
                      var consignee_tel: String,
                      order_status: String,
                      payment_way: String,
                      user_id: String,
                      img_url: String,
                      total_amount: Double,
                      expire_time: String,
                      delivery_address: String,
                      create_time: String,
                      operate_time: String,
                      tracking_no: String,
                      parent_order_id: String,
                      out_trade_no: String,
                      trade_body: String,
                      // 方便分時和每日統計,額外新增的欄位
                      var create_date: String,
                      var create_hour: String)

2.3 SparkStreaming消費kafka並儲存到HBase中

package com.yuange.realtime.app

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import com.alibaba.fastjson.JSON
import com.yuange.constants.Constants
import com.yuange.realtime.beans.OrderInfo
import com.yuange.realtime.utils.MyKafkaUtil
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.phoenix.spark._

/**
 * @作者:袁哥
 * @時間:2021/7/7 20:45
 */
object GMVApp extends BaseApp {
  override var appName: String = "GMVApp"
  override var duration: Int = 10

  def main(args: Array[String]): Unit = {
    val ds: InputDStream[ConsumerRecord[String,String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_INFO,streamingContext)
    //將kafka中的資料封裝為樣例類
    val ds1: DStream[OrderInfo] = ds.map(record => {
      val orderInfo: OrderInfo = JSON.parseObject(record.value(),classOf[OrderInfo])
      // 封裝create_date 和 create_hour   "create_time":"2021-07-07 01:39:33"
      val formatter1: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
      val formatter2: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

      val time: LocalDateTime = LocalDateTime.parse(orderInfo.create_date,formatter1)

      orderInfo.create_date = time.format(formatter2)
      orderInfo.create_hour = time.getHour.toString

      // 訂單的明細資料,脫敏  演示手機號脫敏
      orderInfo.consignee_tel = orderInfo.consignee_tel.replaceAll("(\\w{3})\\w*(\\w{4})", "$1****$2")
      orderInfo
    })

    //寫入hbase
    ds1.foreachRDD(rdd => {
      rdd.saveToPhoenix(
        "GMALL_ORDER_INFO",
        Seq("ID","PROVINCE_ID", "CONSIGNEE", "ORDER_COMMENT", "CONSIGNEE_TEL", "ORDER_STATUS", "PAYMENT_WAY", "USER_ID","IMG_URL", "TOTAL_AMOUNT", "EXPIRE_TIME", "DELIVERY_ADDRESS", "CREATE_TIME","OPERATE_TIME","TRACKING_NO","PARENT_ORDER_ID","OUT_TRADE_NO", "TRADE_BODY", "CREATE_DATE", "CREATE_HOUR"),
        HBaseConfiguration.create(),
        Some("hadoop103:2181")
      )
    })
  }
}

2.4測試

  1)啟動gmall_canalclient模組中的MyClient中的main方法

  2)啟動gmall_realtime模組中的GMVApp中的main方法

  3)使用儲存過程模擬業務資料

call `init_data`('2021-07-07',13,2,false)

  4)檢視Idea控制檯

  5)使用phoenix檢視GMALL_ORDER_INFO表中是否有資料

select * from GMALL_ORDER_INFO limit 10;

3資料介面釋出

3.1程式碼清單

控制層

PublisherController

實現介面的web釋出

服務層

PublisherService

資料業務查詢interface

PublisherServiceImpl

業務查詢的實現類

資料層

OrderMapper

資料層查詢的interface

OrderMapper.xml

資料層查詢的實現配置

3.2 介面

3.2.1 訪問路徑

總數

http://localhost:8070/realtime-total?date=2020-08-18

分時統計

http://localhost:8070/realtime-hours?id=order_amount&date=2020-08-18

3.2.2要求資料格式

總數

[{"id":"dau","name":"新增日活","value":1200},

{"id":"new_mid","name":"新增裝置","value":233 },

{"id":"order_amount","name":"新增交易額","value":1000.2 }]

分時統計

{"yesterday":{"11":383,"12":123,"17":88,"19":200 },

"today":{"12":38,"13":1233,"17":123,"19":688 }}

3.3程式碼開發(gmall_publisher模組中操作)

3.3.1 beans層,新建GMVData.java

package com.yuange.gmall.gmall_publisher.beans;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @時間:2021/7/7 22:22
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class GMVData {

    private String hour;
    private Double amount;
    
}

3.3.2 mapper層,修改PublisherMapper介面,新增如下抽象方法

//查詢每天的總交易額
    Double getGMVByDate(String date);

    //查詢分時交易額
    List<GMVData> getGMVDatasByDate(String date);

3.3.3 service層

  1)修改PublisherService介面,新增如下抽象方法

//查詢每天的總交易額
    Double getGMVByDate(String date);

    //查詢分時交易額
    List<GMVData> getGMVDatasByDate(String date);

  2)修改PublisherServiceImpl實現類,新增如下實現

@Override
    public Double getGMVByDate(String date) {
        return publisherMapper.getGMVByDate(date);
    }

    @Override
    public List<GMVData> getGMVDatasByDate(String date) {
        return publisherMapper.getGMVDatasByDate(date);
    }

3.3.4controller層

package com.yuange.gmall.gmall_publisher.controller;

import com.alibaba.fastjson.JSONObject;
import com.yuange.gmall.gmall_publisher.beans.DAUData;
import com.yuange.gmall.gmall_publisher.beans.GMVData;
import com.yuange.gmall.gmall_publisher.service.PublisherService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/6 23:10
 */
@RestController
public class GmallPublisherController {
    @Autowired
    private PublisherService publisherService;

    //http://localhost:8070/realtime-total?date=2021-07-06
    @RequestMapping(value = "/realtime-total")
    public Object handle1(String date){
        ArrayList<JSONObject> result = new ArrayList<>();

        Integer dau = publisherService.getDAUByDate(date);
        Integer newMidCounts = publisherService.getNewMidCountByDate(date);
        Double gmv = publisherService.getGMVByDate(date);

        JSONObject jsonObject1 = new JSONObject();
        jsonObject1.put("id","dau");
        jsonObject1.put("name","新增日活");
        jsonObject1.put("value",dau);

        JSONObject jsonObject2 = new JSONObject();
        jsonObject2.put("id","new_mid");
        jsonObject2.put("name","新增裝置");
        jsonObject2.put("value",newMidCounts);

        JSONObject jsonObject3 = new JSONObject();
        jsonObject3.put("id","order_amount");
        jsonObject3.put("name","新增交易額");
        jsonObject3.put("value",gmv);

        result.add(jsonObject1);
        result.add(jsonObject2);
        result.add(jsonObject3);
        return result;
    }

    @RequestMapping(value = "/realtime-hours")
    public Object handle2(String id,String date){
        //根據今天求昨天的日期
        LocalDate toDay = LocalDate.parse(date);
        String yestodayDate = toDay.minusDays(1).toString();

        JSONObject result = new JSONObject();

        if ("dau".equals(id)){
            List<DAUData> todayDatas = publisherService.getDAUDatasByDate(date);
            List<DAUData> yestodayDatas = publisherService.getDAUDatasByDate(yestodayDate);

            JSONObject jsonObject1 = parseData(todayDatas);
            JSONObject jsonObject2 = parseData(yestodayDatas);

            result.put("yesterday",jsonObject2);
            result.put("today",jsonObject1);
        }else{
            List<GMVData> todayDatas = publisherService.getGMVDatasByDate(date);
            List<GMVData> yestodayDatas = publisherService.getGMVDatasByDate(yestodayDate);

            JSONObject jsonObject1 = parseGMVData(todayDatas);
            JSONObject jsonObject2 = parseGMVData(yestodayDatas);

            result.put("yesterday",jsonObject2);
            result.put("today",jsonObject1);
        }
        return result;
    }

    public JSONObject parseGMVData(List<GMVData> datas){
        JSONObject jsonObject = new JSONObject();
        for (GMVData data : datas) {
            jsonObject.put(data.getHour(),data.getAmount());
        }
        return jsonObject;
    }

    //負責把 List<DAUData>  封裝為一個JSONObject
    public JSONObject parseData(List<DAUData> datas){
        JSONObject jsonObject = new JSONObject();
        for (DAUData data : datas) {
            jsonObject.put(data.getHour(),data.getNum());
        }
        return jsonObject;
    }
}

3.3.5PublisherMapper.xml新增如下內容

<select id="getGMVByDate" resultType="double">
            select
                sum(total_amount)
            from GMALL_ORDER_INFO
             where create_date = #{date}
    </select>

    <select id="getGMVDatasByDate" resultType="com.yuange.gmall.gmall_publisher.beans.GMVData" >
         select
            create_hour hour,sum(total_amount) amount
         from GMALL_ORDER_INFO
         where create_date = #{date}
         group by create_hour
    </select>

3.3.6index.html新增如下內容

<br/>
<a href="/realtime-hours?id=order_amount&date=2021-07-07">統計昨天和今天的分時GMV資料</a>

3.4 索引優化

create local  index  IDX_GMALL_ORDER_CREATE_DATE_HOUR on gmallXXX_order_info(create_date,create_hour)

3.5 對接視覺化模組