1. 程式人生 > 其它 >實時分析系統--靈活分析需求

實時分析系統--靈活分析需求

1章 需求分析

1.1靈活查詢的場景

  數倉中儲存了大量的明細資料,但是Hadoop儲存的數倉計算必須經過MR,所以即時互動性非常糟糕。為了方便資料分析人員檢視資訊,資料平臺需要提供一個能夠根據文字及選項等條件,進行靈活分析判斷的資料功能。

1.2需求詳細

輸入引數

日期

查詢資料的日期

關鍵字

根據商品名稱涉及到的詞進行搜尋

返回結果

餅圖

男女比例佔比

男 ,女

年齡比例佔比

20歲以下,20-30歲,30歲以上

購買行為資料明細

包括,使用者id,性別年齡,級別,購買的時間,商品價格,訂單狀態,等資訊。

可翻頁。

2章 架構分析

2.1 T+1模式

2.1.1 實現步驟

  1)利用sqoop等工具,從業務資料庫中批量抽取資料;

  2)利用數倉作業,在dws層組織寬表(使用者購買行為);

  3)開發spark的批處理任務,把dws層的寬表匯入到ES中;

  4)從ES讀取資料釋出介面,對接視覺化模組。

2.1.2 特點

  優點:可以利用在離線作業處理好的dws層寬表,直接匯出一份到ES進行快速互動的分析。

  缺點:因為要用離線處理的後的結果在放入ES,所以時效性等同於離線資料。

2.2 T+0 模式

2.2.1 實現步驟

  1)利用canal抓取對應的資料表的實時新增變化資料,推送到K

afka;

  2)在spark-streaming中進行轉換,過濾,關聯組合成寬表的結構;

  3)儲存到ES中;

  4)從ES讀取資料釋出介面,對接視覺化模組。

2.2.2 特點

  優點:實時產生資料,時效性非常高。

  缺點:因為從kafka中得到的是原始資料,所以要利用spark-streaming要進行加工處理,相對來說要比批處理方式麻煩,比如join操作,其中存在網路延遲問題。

3章 實時採集資料

3.1 Canal 模組中增加要追蹤的表

  1)在gmall_common模組中新增兩個常量

public static final String GMALL_ORDER_DETAIL = "GMALL_ORDER_DETAIL";     //
gmall_order_detail public static final String GMALL_USER_INFO = "GMALL_USER_INFO"; //gmall_user_info

  2)修改gmall_canalclient模組中的MyClient類

package com.yuange.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yuange.constants.Constants;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;

/**
 * @作者:袁哥
 * @時間:2021/7/7 18:10
 * 步驟:
 *    ①建立一個客戶端:CanalConnector(SimpleCanalConnector:  單節點canal叢集、ClusterCanalConnector: HA canal叢集)
 *    ②使用客戶端連線 canal server
 *    ③指定客戶端訂閱 canal server中的binlog資訊
 *    ④解析binlog資訊
 *    ⑤寫入kafka
 *
 * 消費到的資料的結構:
 *      Message:  代表拉取的一批資料,這一批資料可能是多個SQL執行,造成的寫操作變化
 *          List<Entry> entries : 每個Entry代表一個SQL造成的寫操作變化
 *          id : -1 說明沒有拉取到資料
 *      Entry:
 *          CanalEntry.Header header_ :  頭資訊,其中包含了對這條sql的一些說明
 *              private Object tableName_: sql操作的表名
 *              EntryType; Entry操作的型別
 *                  開啟事務: 寫操作  begin
 *                  提交事務: 寫操作  commit
 *                  對原始資料進行影響的寫操作: rowdata
 *                          update、delete、insert
 *          ByteString storeValue_:   資料
 *              序列化資料,需要使用工具類RowChange,進行轉換,轉換之後獲取到一個RowChange物件
 */
public class MyClient {

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
        /**
         * 建立一個canal客戶端
         * public static CanalConnector newSingleConnector(
         *              SocketAddress address,  //指定canal server的主機名和埠號
         *              tring destination,      //參考/opt/module/canal/conf/canal.properties中的canal.destinations 屬性值
         *              String username,        //不是instance.properties中的canal.instance.dbUsername
         *              String password         //參考AdminGuide(從canal 1.1.4 之後才提供的),連結地址:https://github.com/alibaba/canal/wiki/AdminGuide
         * ) {...}
         * */
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop104", 11111),
                "example", "", "");

        //使用客戶端連線 canal server
        canalConnector.connect();

        //指定客戶端訂閱 canal server中的binlog資訊  只統計在Order_info表
        canalConnector.subscribe("gmall_realtime.*");

        //不停地拉取資料   Message[id=-1,entries=[],raw=false,rawEntries=[]] 代表當前這批沒有拉取到資料
        while (true){
            Message message = canalConnector.get(100);
            //判斷是否拉取到了資料,如果沒有拉取到,歇一會再去拉取
            if (message.getId() == -1){
                System.out.println("暫時沒有資料,先等會");
                Thread.sleep(5000);
                continue;
            }

            // 資料的處理邏輯
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                //判斷這個entry的型別是不是rowdata型別,只處理rowdata型別
                if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){
                    ByteString storeValue = entry.getStoreValue();          //資料
                    String tableName = entry.getHeader().getTableName();    //表名
                    handleStoreValue(storeValue,tableName);
                }
            }
        }
    }

    private static void handleStoreValue(ByteString storeValue, String tableName) throws InvalidProtocolBufferException {
        //將storeValue 轉化為 RowChange
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

        /**
         * 一個RowChange代表多行資料
         * order_info: 可能會執行的寫操作型別,統計GMV    total_amount
         *          insert :  會-->更新後的值
         *          update :  不會-->只允許修改 order_status
         *          delete :  不會,資料是不允許刪除
         * 判斷當前這批寫操作產生的資料是不是insert語句產生的
         * */

        // 採集 order_info 的insert
        if ( "order_info".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {
            writeDataToKafka(Constants.GMALL_ORDER_INFO,rowChange);
        // 採集 order_detail 的insert
        }else if ("order_detail".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {
            writeDataToKafka(Constants.GMALL_ORDER_DETAIL,rowChange);
        }else if((rowChange.getEventType().equals(CanalEntry.EventType.INSERT)
                ||rowChange.getEventType().equals(CanalEntry.EventType.INSERT))&&"user_info".equals(tableName)){
            writeDataToKafka(Constants.GMALL_USER_INFO,rowChange);
        }
    }

    public static void writeDataToKafka(String topic,CanalEntry.RowChange rowChange){
        //獲取行的集合
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

        for (CanalEntry.RowData rowData : rowDatasList) {
            JSONObject jsonObject = new JSONObject();

            //獲取insert後每一行的每一列
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

            for (CanalEntry.Column column : afterColumnsList) {
                jsonObject.put(column.getName(),column.getValue());
            }

            // 模擬網路波動和延遲  隨機產生一個 1-5直接的隨機數
            int i = new Random().nextInt(15);

            /*try {
                Thread.sleep(15 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/

            //傳送資料到Kafka
            MyProducer.sendDataToKafka(topic,jsonObject.toJSONString());
        }
    }
}

4章 實時資料處理

4.1資料處理流程

4.2雙流join(難點)

4.2.1 程式流程圖

4.2.2樣例類

  1)在gmall_realtime模組新建樣例類OrderDetail

package com.yuange.realtime.beans

/**
 * @作者:袁哥
 * @時間:2021/7/11 21:34
 *   樣例類的欄位:
 *        如果需要取kafka的全部欄位,設定對應的全部欄位
 *        此外額外新增自己需要的欄位,如果不需要Kafka中的全部欄位,可以只取需要的欄位
 */
case class OrderDetail(id:String,
                       order_id: String,
                       sku_name: String,
                       sku_id: String,
                       order_price: String,
                       img_url: String,
                       sku_num: String)

  2)在gmall_realtime模組新建樣例類SaleDetail

package com.yuange.realtime.beans

import java.text.SimpleDateFormat
/** * @作者:袁哥 * @時間:2021/7/11 21:36 */ case class SaleDetail( var order_detail_id:String =null, var order_id: String=null, var order_status:String=null, var create_time:String=null, var user_id: String=null, var sku_id: String=null, var user_gender: String=null, var user_age: Int=0, var user_level: String=null, var sku_price: Double=0D, var sku_name: String=null, var dt:String=null) { def this(orderInfo:OrderInfo,orderDetail: OrderDetail) { this mergeOrderInfo(orderInfo) mergeOrderDetail(orderDetail) } def mergeOrderInfo(orderInfo:OrderInfo): Unit ={ if(orderInfo!=null){ this.order_id=orderInfo.id this.order_status=orderInfo.order_status this.create_time=orderInfo.create_time this.dt=orderInfo.create_date this.user_id=orderInfo.user_id } } def mergeOrderDetail(orderDetail: OrderDetail): Unit ={ if(orderDetail!=null){ this.order_detail_id=orderDetail.id this.sku_id=orderDetail.sku_id this.sku_name=orderDetail.sku_name this.sku_price=orderDetail.order_price.toDouble } } def mergeUserInfo(userInfo: UserInfo): Unit ={ if(userInfo!=null){ this.user_id=userInfo.id val formattor = new SimpleDateFormat("yyyy-MM-dd") val date: java.util.Date = formattor.parse(userInfo.birthday) val curTs: Long = System.currentTimeMillis() val betweenMs= curTs-date.getTime val age=betweenMs/1000L/60L/60L/24L/365L this.user_age=age.toInt this.user_gender=userInfo.gender this.user_level=userInfo.user_level } } }

  3)在gmall_realtime模組新建樣例類UserInfo

case class UserInfo(id:String,
                    login_name:String,
                    user_level:String,
                    birthday:String,
                    gender:String)

4.2.3 App

  1)在gmall_realtime模組新建OrderDetailApp

package com.yuange.realtime.app

import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter
import java.util

import com.alibaba.fastjson.JSON
import com.google.gson.Gson
import com.yuange.constants.Constants
import com.yuange.realtime.beans.{OrderDetail, OrderInfo, SaleDetail, UserInfo}
import com.yuange.realtime.utils.{MyEsUtil, MyKafkaUtil, RedisUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import redis.clients.jedis.Jedis

import scala.collection.mutable.ListBuffer

/**
 * @作者:袁哥
 * @時間:2021/7/11 21:59
 */
object OrderDetailApp extends  BaseApp {
  override var appName: String = "OrderDetailApp"
  override var duration: Int = 10

  def main(args: Array[String]): Unit = {

    run{
      val ds1: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_INFO, streamingContext)
      val ds2: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_DETAIL, streamingContext)

      //封裝為K-V DS
      val ds3: DStream[(String, OrderInfo)] = ds1.map(record => {
        val orderInfo: OrderInfo = JSON.parseObject(record.value(), classOf[OrderInfo])
        // 封裝create_date 和 create_hour   "create_time":"2021-07-07 01:39:33"
        val formatter1: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        val formatter2: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

        val localDateTime: LocalDateTime = LocalDateTime.parse(orderInfo.create_time, formatter1)
        orderInfo.create_date = localDateTime.format(formatter2)
        orderInfo.create_hour = localDateTime.getHour.toString

        // 訂單的明細資料,脫敏  演示手機號脫敏
        orderInfo.consignee_tel = orderInfo.consignee_tel.replaceAll("(\\w{3})\\w*(\\w{4})", "$1****$2")
        (orderInfo.id, orderInfo)
      })

      // ds3.print()
      val ds4: DStream[(String, OrderDetail)] = ds2.map(record => {
        val detail: OrderDetail = JSON.parseObject(record.value(), classOf[OrderDetail])
        (detail.order_id, detail)
      })

      // ds4.print()
      val ds5: DStream[(String, (Option[OrderInfo], Option[OrderDetail]))] = ds3.fullOuterJoin(ds4)
      ds5.print()

      val ds6: DStream[SaleDetail] = ds5.mapPartitions(partition => {
        //存放封裝後的訂單詳請
        val saleDetails: ListBuffer[SaleDetail] = ListBuffer[SaleDetail]()

        //獲取redis連線
        val jedis: Jedis = RedisUtil.getJedisClient()
        val gson = new Gson
        partition.foreach {
          case (order_id, (orderInfoOption, orderDetailOption)) => {
            if (orderInfoOption != None) {
              val orderInfo: OrderInfo = orderInfoOption.get
              // 在當前批次關聯Join上的orderDetail
              if (orderDetailOption != None) {
                val orderDetail: OrderDetail = orderDetailOption.get
                val detail = new SaleDetail(orderInfo, orderDetail)
                saleDetails.append(detail)
              }

              //將order_info寫入redis  ,在redis中存多久:  取系統的最大延遲(假設5min) * 2
              //  set + expire = setex
              jedis.setex("order_info:" + order_id, 5 * 2 * 60, gson.toJson(orderInfo))

              // 從redis中查詢,是否有早到的order_detail
              val earlyOrderDetatils: util.Set[String] = jedis.smembers("order_detail:" + order_id)
              earlyOrderDetatils.forEach(
                str => {
                  val orderDetail: OrderDetail = JSON.parseObject(str, classOf[OrderDetail])
                  val detail = new SaleDetail(orderInfo, orderDetail)
                  saleDetails.append(detail)
                }
              )

            } else {
              //都是當前批次無法配對的orderDetail
              val orderDetail: OrderDetail = orderDetailOption.get

              // 從redis中查詢是否有早到的order_info
              val orderInfoStr: String = jedis.get("order_info:" + orderDetail.order_id)
              if (orderInfoStr != null) {
                val detail = new SaleDetail(JSON.parseObject(orderInfoStr, classOf[OrderInfo]), orderDetail)
                saleDetails.append(detail)
              } else {
                //說明當前Order_detail 早來了,快取中找不到對於的Order_info,需要將當前的order-detail寫入redis
                jedis.sadd("order_detail:" + orderDetail.order_id, gson.toJson(orderDetail))
                jedis.expire("order_detail:" + orderDetail.order_id, 5 * 2 * 60)
              }
            }
          }
        }

        jedis.close()
        saleDetails.iterator
      })

      // 根據user_id查詢 使用者的其他資訊
      val ds7: DStream[SaleDetail] = ds6.mapPartitions(partition => {
        val jedis: Jedis = RedisUtil.getJedisClient()
        val saleDetailsWithUserInfo: Iterator[SaleDetail] = partition.map(saleDetail => {
          val userInfoStr: String = jedis.get("user_id:" + saleDetail.user_id)
          val userInfo: UserInfo = JSON.parseObject(userInfoStr, classOf[UserInfo])
          saleDetail.mergeUserInfo(userInfo)
          saleDetail
        })
        jedis.close()

        saleDetailsWithUserInfo
      })

      //寫入ES   將DS轉換為 docList: List[(String, Any)]
      val ds8: DStream[(String, SaleDetail)] = ds7.map(saleDetail => ((saleDetail.order_detail_id, saleDetail)))
      ds8.foreachRDD(rdd => {
        rdd.foreachPartition(partition => {
          MyEsUtil.insertBulk("gmall_sale_detail" + LocalDate.now() , partition.toList)
        })
      })
    }
  }
}

  2)在gmall_realtime模組新建UserInfoApp

package com.yuange.realtime.app

import com.alibaba.fastjson.JSON
import com.yuange.constants.Constants
import com.yuange.realtime.utils.{MyKafkaUtil, RedisUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.InputDStream

/**
 * @作者:袁哥
 * @時間:2021/7/13 20:11
 */
object UserInfoApp extends BaseApp {
  override var appName: String = "UserInfoApp"
  override var duration: Int = 10

  def main(args: Array[String]): Unit = {
    run{
      val ds: InputDStream[ConsumerRecord[String,String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_USER_INFO,streamingContext)
      ds.foreachRDD(rdd => {
        rdd.foreachPartition(partiton => {
          //獲取連線
          val jedis = RedisUtil.getJedisClient()
          partiton.foreach(record => {
            val key: String = JSON.parseObject(record.value()).getString("id")
            jedis.set("user_id:" + key, record.value())
          })
          jedis.close()
        })
      })
    }
  }
}

  3)在gmall_realtime模組中的MyEsUtil 中給items做一個判空處理

package com.yuange.realtime.utils

import java.util.Objects
import java.util

import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, BulkResult, Index}
import collection.JavaConverters._

/**
 * @作者:袁哥
 * @時間:2021/7/9 21:16
 */
object MyEsUtil {

  private val ES_HOST = "http://hadoop102"
  private val ES_HTTP_PORT = 9200
  private var factory: JestClientFactory = null

  /**
   * 獲取客戶端
   *
   * @return jestclient
   */
  def getClient: JestClient = {
    if (factory == null) build()
    factory.getObject
  }

  /**
   * 關閉客戶端
   */
  def close(client: JestClient): Unit = {
    if (!Objects.isNull(client)) try
      client.shutdownClient()
    catch {
      case e: Exception =>
        e.printStackTrace()
    }
  }

  /**
   * 建立連線
   */
  private def build(): Unit = {
    factory = new JestClientFactory
    factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
      .maxTotalConnection(20) //連線總數
      .connTimeout(10000).readTimeout(10000).build)
  }

  /*
      批量插入資料到ES

        需要先將寫入的資料,封裝為 docList: List[(String, Any)]
          (String, Any):   K:id
                            V: document
   */
  def insertBulk(indexName: String, docList: List[(String, Any)]): Unit = {
    if (docList.size > 0) {
      val jest: JestClient = getClient

      val bulkBuilder = new Bulk.Builder().defaultIndex(indexName).defaultType("_doc")

      for ((id, doc) <- docList) {
        val indexBuilder = new Index.Builder(doc)
        if (id != null) {
          indexBuilder.id(id)
        }
        val index: Index = indexBuilder.build()
        bulkBuilder.addAction(index)
      }

      val bulk: Bulk = bulkBuilder.build()
      var items: util.List[BulkResult#BulkResultItem] = null

      try {
        items = jest.execute(bulk).getItems
      } catch {
        case ex: Exception => println(ex.toString)
      } finally {
        //自動關閉連線
        close(jest)
        if (items != null){
          println("儲存" + items.size() + "條資料")
          /*
            items: 是一個java的集合
             <- 只能用來遍歷scala的集合
             將items,由Java的集合轉換為scala的集合    java集合.asScala
             由scala集合轉java集合 scala集合.asJava
         */
          for (item <- items.asScala) {
            if (item.error != null && item.error.nonEmpty) {
              println(item.error)
              println(item.errorReason)
            }
          }
        }
      }
    }
  }
}

4.3ES索引建立

PUT _template/gmall_sale_detail_template
{
   "index_patterns": ["gmall_sale_detail*"],                  
    "settings": {                                               
      "number_of_shards": 3
    },
    "aliases" : { 
      "{index}-query": {},
      "gmall_sale_detail-query":{}
    },
    "mappings" : {
      "_doc" : {
        "properties" : {
          "order_detail_id" : {
            "type" :   "keyword"
          },
          "order_id" : {
            "type" : "keyword" 
          },
          "create_time" : {
            "type" : "date" ,
            "format" : "yyyy-MM-dd HH:mm:ss"
          },
          "dt" : {
            "type" : "date"
          },
          "order_status" : {
                "type" : "keyword" 
          },
          "sku_id" : {
                "type" : "keyword"
          },
          "sku_name" : {
            "type" : "text",
            "analyzer": "ik_max_word"
          },
          "sku_price" : {
            "type" : "float"
          },
          "user_age" : {
            "type" : "long"
          },
          "user_gender" : {
            "type" : "keyword" 
          },
          "user_id" : {
            "type" : "keyword" 
          },
          "user_level" : {
            "type" : "keyword",
            "index" : false 
          }
        }
      }
    }
  }

4.4測試

  1)啟動Zookeeper

zookeeper.sh start

  2)啟動Kafka

kafka.sh start

  3)啟動canal

/opt/module/canal/bin/startup.sh

  4)啟動gmall_canalclient模組中的MyClient類的main方法,將資料從mysql傳輸至kafka

  5)啟動ES叢集

elasticsearch.sh start

  6)啟動Redis

redis-server /opt/module/redis/redis.conf

  7)使用儲存過程模擬生成資料

#CALL `init_data`(造資料的日期, 生成的訂單數, 生成的使用者數, 是否覆蓋寫)
call `init_data`('2021-07-07',6,3,true)

  8)啟動gmall_realtime模組的UserInfoApp中的main方法,將kafka中的使用者資料傳入Redis

  9)啟動gmall_realtime模組的OrderDetailApp中的main方法,將資料Join之後存入ES

  10)檢視Redis中的資料

  11)檢視ES中的資料

5章 靈活查詢資料介面開發

5.1傳入路徑及引數

http://localhost:8070/sale_detail?date=2020-08-21&startpage=1&size=5&keyword=小米手機

5.2返回值

{"total":62,"stat":[{"options":[{"name":"20歲以下","value":0.0},{"name":"20歲到30歲","value":25.8},{"name":"30歲及30歲以上","value":74.2}],"title":"使用者年齡佔比"},{"options":[{"name":"男","value":38.7},{"name":"女","value":61.3}],"title":"使用者性別佔比"}],"detail":[{"user_id":"9","sku_id":"8","user_gender":"M","user_age":49.0,"user_level":"1","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":6.0,"order_count":2.0,"order_amount":53400.0,"dt":"2019-02-14","es_metadata_id":"wPdM7GgBQMmfy2BJr4YT"},{"user_id":"5","sku_id":"8","user_gender":"F","user_age":36.0,"user_level":"4","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":5.0,"order_count":1.0,"order_amount":44500.0,"dt":"2019-02-14","es_metadata_id":"wvdM7GgBQMmfy2BJr4YT"},{"user_id":"19","sku_id":"8","user_gender":"F","user_age":43.0,"user_level":"5","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":7.0,"order_count":2.0,"order_amount":62300.0,"dt":"2019-02-14","es_metadata_id":"xvdM7GgBQMmfy2BJr4YU"},{"user_id":"15","sku_id":"8","user_gender":"M","user_age":66.0,"user_level":"4","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":3.0,"order_count":1.0,"order_amount":26700.0,"dt":"2019-02-14","es_metadata_id":"xvdM7GgBQMmfy2BJr4YU"}]}

5.3編寫DSL語句

GET gmall_sale_detail-query/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "dt": "2021-07-07"
        }
      }, 
      "must": [
        {"match":{
          "sku_name": {
            "query": "小米手機",
            "operator": "and"
          }
         } 
        }
      ]
    }
  } , 
"aggs":  {
    "groupby_age": {
      "terms": {
        "field": "user_age"
      }
},
"groupby_gender": {
      "terms": {
        "field": "user_gender"
      }
    }
  },
"from": 0,
  "size": 2
}

5.4程式碼開發

5.4.1 程式碼清單

bean

Stat

餅圖

Option

餅圖中的選項

控制層

PublisherController

增加getSaleDetail方法,呼叫服務層方法得到資料並根據web介面和引數組織整理返回值

服務層

PublisherService

增加getSaleDetail方法

PublisherServiceImpl

實現getSaleDetail方法,依據DSL語句查詢ElasticSearch

5.4.2 在gmall_publisher模組中修改pom.xml

<!--- ES依賴包-->
<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest</artifactId>
    <version>5.3.3</version>
</dependency>

<dependency>
    <groupId>net.java.dev.jna</groupId>
    <artifactId>jna</artifactId>
    <version>4.5.2</version>
</dependency>

<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>commons-compiler</artifactId>
    <version>2.7.8</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

5.4.3在gmall_publisher模組中配置 application.properties

#es
spring.elasticsearch.jest.uris=http://hadoop102:9200

5.4.4 Bean

  1)新建Option

package com.yuange.gmall.gmall_publisher.beans;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @時間:2021/7/15 10:45
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Option {
    String name;
    Double value;
}

  2)新建Stat

package com.yuange.gmall.gmall_publisher.beans;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/15 10:46
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Stat {
    String title;
    List<Option> options;
}

  3)新建SaleDetail

package com.yuange.gmall.gmall_publisher.beans;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @作者:袁哥
 * @時間:2021/7/15 10:51
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class SaleDetail {
    private String order_detail_id;
    private String order_id;
    private String order_status;
    private String create_time;
    private String user_id;
    private String sku_id;
    private String user_gender;
    private Integer user_age;
    private String user_level;
    private Double sku_price;
    private String sku_name;
    private String dt;
    // 多新增
    private String  es_metadata_id;
}

  4)lombok註解說明:

@Data:註解會自動增加getter 和setter方法
@AllArgsConstructor:會自動增加包含全部屬性的建構函式
@NoArgsConstructor:新增無參構造器

  5)需要pom.xml增加依賴(我之前已經加入依賴了,若你沒加,就加上)

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

5.4.5ESDao

package com.yuange.gmall.gmall_publisher.dao;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;

/**
 * @作者:袁哥
 * @時間:2021/7/15 10:53
 */
public interface ESDao {
    JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException;
}

5.4.6ESDaoImpl

package com.yuange.gmall.gmall_publisher.dao;

import com.alibaba.fastjson.JSONObject;
import com.yuange.gmall.gmall_publisher.beans.Option;
import com.yuange.gmall.gmall_publisher.beans.SaleDetail;
import com.yuange.gmall.gmall_publisher.beans.Stat;
import io.searchbox.client.JestClient;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import io.searchbox.core.search.aggregation.TermsAggregation;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

/**
 * @作者:袁哥
 * @時間:2021/7/15 10:56
 */
@Repository
public class ESDaoImpl implements  ESDao {

    @Autowired //從容器中取一個JestClient型別的物件
    private JestClient jestClient;

    /*
            從ES中查詢出資料
            date :   指定查詢的index 名稱
                            gmall2020_sale_detail +  date
            keyword: 全文檢索的關鍵字
            startpage:
                    計算from :  (N - 1) size
                    1頁 10 條資料。
                    startpage: 1    from: 0  size: 10
                    startpage: 2    from: 10 size 10
                    startpage: N     from: (N - 1) size
                    startpage: 3     from: 20 , size : 10
            size:   查詢返回的資料條數
            查詢條件:
                GET /gmall2020_sale_detail-query/_search
                {
                  "query": {
                    "match": {
                      "sku_name": "手機"
                    }
                  },
                  "from": 0,
                  "size": 20,
                  "aggs": {
                    "genderCount": {
                      "terms": {
                        "field": "user_gender",
                        "size": 10
                      }
                    },
                    "ageCount": {
                      "terms": {
                        "field": "user_age",
                        "size": 150
                      }
                    }
                  }
                }

     */
    public SearchResult getDataFromES(String date, Integer startpage, Integer size, String keyword) throws IOException {
        String indexName ="gmall_sale_detail" + date;
        int from = (startpage - 1 ) * size;

        // genderCount:{}
        TermsBuilder aggs1 = AggregationBuilders.terms("genderCount").field("user_gender").size(10);

        // "ageCount":{}
        TermsBuilder aggs2 = AggregationBuilders.terms("ageCount").field("user_age").size(150);

        // query":{}
        MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("sku_name", keyword);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(matchQueryBuilder).aggregation(aggs1).aggregation(aggs2);

        //生成查詢的字串
        Search search = new Search.Builder(searchSourceBuilder.toString()).addType("_doc").addIndex(indexName).build();
        SearchResult searchResult = jestClient.execute(search);
        return searchResult;
    }


    // 將ES中查詢的資料,按照指定的格式,封裝 detail資料
    public List<SaleDetail> getDetailData(SearchResult searchResult){

        ArrayList<SaleDetail> saleDetails = new ArrayList<>();

        List<SearchResult.Hit<SaleDetail, Void>> hits = searchResult.getHits(SaleDetail.class);

        for (SearchResult.Hit<SaleDetail, Void> hit : hits) {
            SaleDetail saleDetail = hit.source;
            saleDetail.setEs_metadata_id(hit.id);
            saleDetails.add(saleDetail);
        }
        return saleDetails;
    }

    //  將ES中查詢的資料,按照指定的格式,封裝 age相關的 stat資料
    public Stat getAgeStat(SearchResult searchResult){

        Stat stat = new Stat();

        MetricAggregation aggregations = searchResult.getAggregations();

        TermsAggregation ageCount = aggregations.getTermsAggregation("ageCount");

        List<TermsAggregation.Entry> buckets = ageCount.getBuckets();

        int agelt20=0;
        int agege30=0;
        int age20to30=0;

        double sumCount=0;

        for (TermsAggregation.Entry bucket : buckets) {
            if (Integer.parseInt(bucket.getKey()) < 20 ){
                agelt20 += bucket.getCount();
            }else if(Integer.parseInt(bucket.getKey()) >= 30){
                agege30 += bucket.getCount();
            }else{
                age20to30+=bucket.getCount();
            }
        }
        sumCount = age20to30 + agege30 + agelt20;

        DecimalFormat format = new DecimalFormat("###.00");

        List<Option> ageoptions =new ArrayList<>();

        double perlt20 = agelt20 / sumCount * 100;
        double per20to30 = age20to30 / sumCount * 100;

        ageoptions.add(new Option("20歲以下",Double.parseDouble(format.format(perlt20  ))));
        ageoptions.add(new Option("20歲到30歲",Double.parseDouble(format.format( per20to30))));
        ageoptions.add(new Option("30歲及30歲以上",Double.parseDouble(format.format(100 - perlt20 - per20to30  ))));

        stat.setOptions(ageoptions);
        stat.setTitle("使用者年齡佔比");
        return stat;
    }

    public Stat getGenderStat(SearchResult searchResult){
        Stat stat = new Stat();
        MetricAggregation aggregations = searchResult.getAggregations();

        TermsAggregation ageCount = aggregations.getTermsAggregation("genderCount");

        List<TermsAggregation.Entry> buckets = ageCount.getBuckets();

        int maleCount=0;
        int femaleCount=0;

        double sumCount=0;

        for (TermsAggregation.Entry bucket : buckets) {
            if (bucket.getKey().equals("F") ){
                femaleCount += bucket.getCount();
            }else{
                maleCount += bucket.getCount();
            }
        }
        sumCount = maleCount + femaleCount;

        DecimalFormat format = new DecimalFormat("###.00");

        List<Option> ageoptions =new ArrayList<>();

        ageoptions.add(new Option("男",Double.parseDouble(format.format(maleCount / sumCount * 100  ))));
        ageoptions.add(new Option("女",Double.parseDouble(format.format( (1 - maleCount / sumCount ) * 100  ))));

        stat.setOptions(ageoptions);
        stat.setTitle("使用者性別佔比");
        return stat;
    }

    //  將ES中查詢的資料,按照指定的格式,封裝 gender相關的 stat資料
    @Override
    public JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException {

        SearchResult searchResult = getDataFromES(date, startpage, size, keyword);

        List<SaleDetail> detailData = getDetailData(searchResult);

        Stat ageStat = getAgeStat(searchResult);

        Stat genderStat = getGenderStat(searchResult);

        JSONObject jsonObject = new JSONObject();

        jsonObject.put("total",searchResult.getTotal());

        ArrayList<Stat> stats = new ArrayList<>();
        stats.add(ageStat);
        stats.add(genderStat);
        jsonObject.put("stat",stats);
        jsonObject.put("detail",detailData);
        return jsonObject;
    }
}

5.4.7PublisherService

JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException;

5.4.8PublisherServiceImpl

@Autowired
private ESDao esDao;

@Override
public JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException {
   return esDao.getESData(date,startpage,size,keyword);
}

5.4.9 PublisherController

/*
        http://localhost:8070/sale_detail?date=2020-08-21&startpage=1&size=5&keyword=小米手機
     */
    @RequestMapping(value = "/sale_detail")
    public JSONObject handle3(String date,Integer startpage,Integer size,String keyword) throws IOException {
        return publisherService.getESData(date,startpage,size,keyword);
    }

5.4.10index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>

<a href="/realtime-total?date=2021-07-06">統計每日日活和新增裝置</a>

<br/>
<a href="/realtime-hours?id=dau&date=2021-07-06">統計昨天和今天的分時DAU資料</a>

<br/>
<a href="/realtime-hours?id=order_amount&date=2021-07-07">統計昨天和今天的分時GMV資料</a>

<br/>
<a href="/sale_detail?date=2021-07-15&startpage=1&size=10&keyword=手機">請求購物明細明細</a>

</body>
</html>

5.4.11啟動gmall_publisher模組

5.4.12 測試:

  1)先在ES中查詢是否有2021-07-15的資料,若沒有,下面的操作會返回500錯誤碼

  2)訪問:http://localhost:8070/

  3)完整實時專案程式碼已上傳至GitHub:https://github.com/LzMingYueShanPao/gmall_sparkstream.git

5.5對接視覺化模組