實時分析系統--靈活分析需求
第1章 需求分析
1.1靈活查詢的場景
數倉中儲存了大量的明細資料,但是Hadoop儲存的數倉計算必須經過MR,所以即時互動性非常糟糕。為了方便資料分析人員檢視資訊,資料平臺需要提供一個能夠根據文字及選項等條件,進行靈活分析判斷的資料功能。
1.2需求詳細
輸入引數
日期 |
查詢資料的日期 |
關鍵字 |
根據商品名稱涉及到的詞進行搜尋 |
返回結果
餅圖 |
男女比例佔比 |
男 ,女 |
年齡比例佔比 |
20歲以下,20-30歲,30歲以上 |
|
購買行為資料明細 |
包括,使用者id,性別年齡,級別,購買的時間,商品價格,訂單狀態,等資訊。 可翻頁。 |
第2章 架構分析
2.1 T+1模式
2.1.1 實現步驟
1)利用sqoop等工具,從業務資料庫中批量抽取資料;
2)利用數倉作業,在dws層組織寬表(使用者購買行為);
3)開發spark的批處理任務,把dws層的寬表匯入到ES中;
4)從ES讀取資料釋出介面,對接視覺化模組。
2.1.2 特點
優點:可以利用在離線作業處理好的dws層寬表,直接匯出一份到ES進行快速互動的分析。
缺點:因為要用離線處理的後的結果在放入ES,所以時效性等同於離線資料。
2.2 T+0 模式
2.2.1 實現步驟
1)利用canal抓取對應的資料表的實時新增變化資料,推送到K
2)在spark-streaming中進行轉換,過濾,關聯組合成寬表的結構;
3)儲存到ES中;
4)從ES讀取資料釋出介面,對接視覺化模組。
2.2.2 特點
優點:實時產生資料,時效性非常高。
缺點:因為從kafka中得到的是原始資料,所以要利用spark-streaming要進行加工處理,相對來說要比批處理方式麻煩,比如join操作,其中存在網路延遲問題。
第3章 實時採集資料
3.1 在Canal 模組中增加要追蹤的表
1)在gmall_common模組中新增兩個常量
public static final String GMALL_ORDER_DETAIL = "GMALL_ORDER_DETAIL"; //gmall_order_detail public static final String GMALL_USER_INFO = "GMALL_USER_INFO"; //gmall_user_info
2)修改gmall_canalclient模組中的MyClient類
package com.yuange.canal; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.yuange.constants.Constants; import java.net.InetSocketAddress; import java.util.List; import java.util.Random; /** * @作者:袁哥 * @時間:2021/7/7 18:10 * 步驟: * ①建立一個客戶端:CanalConnector(SimpleCanalConnector: 單節點canal叢集、ClusterCanalConnector: HA canal叢集) * ②使用客戶端連線 canal server * ③指定客戶端訂閱 canal server中的binlog資訊 * ④解析binlog資訊 * ⑤寫入kafka * * 消費到的資料的結構: * Message: 代表拉取的一批資料,這一批資料可能是多個SQL執行,造成的寫操作變化 * List<Entry> entries : 每個Entry代表一個SQL造成的寫操作變化 * id : -1 說明沒有拉取到資料 * Entry: * CanalEntry.Header header_ : 頭資訊,其中包含了對這條sql的一些說明 * private Object tableName_: sql操作的表名 * EntryType; Entry操作的型別 * 開啟事務: 寫操作 begin * 提交事務: 寫操作 commit * 對原始資料進行影響的寫操作: rowdata * update、delete、insert * ByteString storeValue_: 資料 * 序列化資料,需要使用工具類RowChange,進行轉換,轉換之後獲取到一個RowChange物件 */ public class MyClient { public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { /** * 建立一個canal客戶端 * public static CanalConnector newSingleConnector( * SocketAddress address, //指定canal server的主機名和埠號 * tring destination, //參考/opt/module/canal/conf/canal.properties中的canal.destinations 屬性值 * String username, //不是instance.properties中的canal.instance.dbUsername * String password //參考AdminGuide(從canal 1.1.4 之後才提供的),連結地址:https://github.com/alibaba/canal/wiki/AdminGuide * ) {...} * */ CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop104", 11111), "example", "", ""); //使用客戶端連線 canal server canalConnector.connect(); //指定客戶端訂閱 canal server中的binlog資訊 只統計在Order_info表 canalConnector.subscribe("gmall_realtime.*"); //不停地拉取資料 Message[id=-1,entries=[],raw=false,rawEntries=[]] 代表當前這批沒有拉取到資料 while (true){ Message message = canalConnector.get(100); //判斷是否拉取到了資料,如果沒有拉取到,歇一會再去拉取 if (message.getId() == -1){ System.out.println("暫時沒有資料,先等會"); Thread.sleep(5000); continue; } // 資料的處理邏輯 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { //判斷這個entry的型別是不是rowdata型別,只處理rowdata型別 if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){ ByteString storeValue = entry.getStoreValue(); //資料 String tableName = entry.getHeader().getTableName(); //表名 handleStoreValue(storeValue,tableName); } } } } private static void handleStoreValue(ByteString storeValue, String tableName) throws InvalidProtocolBufferException { //將storeValue 轉化為 RowChange CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); /** * 一個RowChange代表多行資料 * order_info: 可能會執行的寫操作型別,統計GMV total_amount * insert : 會-->更新後的值 * update : 不會-->只允許修改 order_status * delete : 不會,資料是不允許刪除 * 判斷當前這批寫操作產生的資料是不是insert語句產生的 * */ // 採集 order_info 的insert if ( "order_info".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) { writeDataToKafka(Constants.GMALL_ORDER_INFO,rowChange); // 採集 order_detail 的insert }else if ("order_detail".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) { writeDataToKafka(Constants.GMALL_ORDER_DETAIL,rowChange); }else if((rowChange.getEventType().equals(CanalEntry.EventType.INSERT) ||rowChange.getEventType().equals(CanalEntry.EventType.INSERT))&&"user_info".equals(tableName)){ writeDataToKafka(Constants.GMALL_USER_INFO,rowChange); } } public static void writeDataToKafka(String topic,CanalEntry.RowChange rowChange){ //獲取行的集合 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { JSONObject jsonObject = new JSONObject(); //獲取insert後每一行的每一列 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { jsonObject.put(column.getName(),column.getValue()); } // 模擬網路波動和延遲 隨機產生一個 1-5直接的隨機數 int i = new Random().nextInt(15); /*try { Thread.sleep(15 * 1000); } catch (InterruptedException e) { e.printStackTrace(); }*/ //傳送資料到Kafka MyProducer.sendDataToKafka(topic,jsonObject.toJSONString()); } } }
第4章 實時資料處理
4.1資料處理流程
4.2雙流join(難點)
4.2.1 程式流程圖
4.2.2樣例類
1)在gmall_realtime模組新建樣例類OrderDetail
package com.yuange.realtime.beans /** * @作者:袁哥 * @時間:2021/7/11 21:34 * 樣例類的欄位: * 如果需要取kafka的全部欄位,設定對應的全部欄位 * 此外額外新增自己需要的欄位,如果不需要Kafka中的全部欄位,可以只取需要的欄位 */ case class OrderDetail(id:String, order_id: String, sku_name: String, sku_id: String, order_price: String, img_url: String, sku_num: String)
2)在gmall_realtime模組新建樣例類SaleDetail
package com.yuange.realtime.beans
import java.text.SimpleDateFormat
/** * @作者:袁哥 * @時間:2021/7/11 21:36 */ case class SaleDetail( var order_detail_id:String =null, var order_id: String=null, var order_status:String=null, var create_time:String=null, var user_id: String=null, var sku_id: String=null, var user_gender: String=null, var user_age: Int=0, var user_level: String=null, var sku_price: Double=0D, var sku_name: String=null, var dt:String=null) { def this(orderInfo:OrderInfo,orderDetail: OrderDetail) { this mergeOrderInfo(orderInfo) mergeOrderDetail(orderDetail) } def mergeOrderInfo(orderInfo:OrderInfo): Unit ={ if(orderInfo!=null){ this.order_id=orderInfo.id this.order_status=orderInfo.order_status this.create_time=orderInfo.create_time this.dt=orderInfo.create_date this.user_id=orderInfo.user_id } } def mergeOrderDetail(orderDetail: OrderDetail): Unit ={ if(orderDetail!=null){ this.order_detail_id=orderDetail.id this.sku_id=orderDetail.sku_id this.sku_name=orderDetail.sku_name this.sku_price=orderDetail.order_price.toDouble } } def mergeUserInfo(userInfo: UserInfo): Unit ={ if(userInfo!=null){ this.user_id=userInfo.id val formattor = new SimpleDateFormat("yyyy-MM-dd") val date: java.util.Date = formattor.parse(userInfo.birthday) val curTs: Long = System.currentTimeMillis() val betweenMs= curTs-date.getTime val age=betweenMs/1000L/60L/60L/24L/365L this.user_age=age.toInt this.user_gender=userInfo.gender this.user_level=userInfo.user_level } } }
3)在gmall_realtime模組新建樣例類UserInfo
case class UserInfo(id:String, login_name:String, user_level:String, birthday:String, gender:String)
4.2.3 App
1)在gmall_realtime模組新建OrderDetailApp
package com.yuange.realtime.app import java.time.{LocalDate, LocalDateTime} import java.time.format.DateTimeFormatter import java.util import com.alibaba.fastjson.JSON import com.google.gson.Gson import com.yuange.constants.Constants import com.yuange.realtime.beans.{OrderDetail, OrderInfo, SaleDetail, UserInfo} import com.yuange.realtime.utils.{MyEsUtil, MyKafkaUtil, RedisUtil} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.dstream.{DStream, InputDStream} import redis.clients.jedis.Jedis import scala.collection.mutable.ListBuffer /** * @作者:袁哥 * @時間:2021/7/11 21:59 */ object OrderDetailApp extends BaseApp { override var appName: String = "OrderDetailApp" override var duration: Int = 10 def main(args: Array[String]): Unit = { run{ val ds1: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_INFO, streamingContext) val ds2: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_DETAIL, streamingContext) //封裝為K-V DS val ds3: DStream[(String, OrderInfo)] = ds1.map(record => { val orderInfo: OrderInfo = JSON.parseObject(record.value(), classOf[OrderInfo]) // 封裝create_date 和 create_hour "create_time":"2021-07-07 01:39:33" val formatter1: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") val formatter2: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") val localDateTime: LocalDateTime = LocalDateTime.parse(orderInfo.create_time, formatter1) orderInfo.create_date = localDateTime.format(formatter2) orderInfo.create_hour = localDateTime.getHour.toString // 訂單的明細資料,脫敏 演示手機號脫敏 orderInfo.consignee_tel = orderInfo.consignee_tel.replaceAll("(\\w{3})\\w*(\\w{4})", "$1****$2") (orderInfo.id, orderInfo) }) // ds3.print() val ds4: DStream[(String, OrderDetail)] = ds2.map(record => { val detail: OrderDetail = JSON.parseObject(record.value(), classOf[OrderDetail]) (detail.order_id, detail) }) // ds4.print() val ds5: DStream[(String, (Option[OrderInfo], Option[OrderDetail]))] = ds3.fullOuterJoin(ds4) ds5.print() val ds6: DStream[SaleDetail] = ds5.mapPartitions(partition => { //存放封裝後的訂單詳請 val saleDetails: ListBuffer[SaleDetail] = ListBuffer[SaleDetail]() //獲取redis連線 val jedis: Jedis = RedisUtil.getJedisClient() val gson = new Gson partition.foreach { case (order_id, (orderInfoOption, orderDetailOption)) => { if (orderInfoOption != None) { val orderInfo: OrderInfo = orderInfoOption.get // 在當前批次關聯Join上的orderDetail if (orderDetailOption != None) { val orderDetail: OrderDetail = orderDetailOption.get val detail = new SaleDetail(orderInfo, orderDetail) saleDetails.append(detail) } //將order_info寫入redis ,在redis中存多久: 取系統的最大延遲(假設5min) * 2 // set + expire = setex jedis.setex("order_info:" + order_id, 5 * 2 * 60, gson.toJson(orderInfo)) // 從redis中查詢,是否有早到的order_detail val earlyOrderDetatils: util.Set[String] = jedis.smembers("order_detail:" + order_id) earlyOrderDetatils.forEach( str => { val orderDetail: OrderDetail = JSON.parseObject(str, classOf[OrderDetail]) val detail = new SaleDetail(orderInfo, orderDetail) saleDetails.append(detail) } ) } else { //都是當前批次無法配對的orderDetail val orderDetail: OrderDetail = orderDetailOption.get // 從redis中查詢是否有早到的order_info val orderInfoStr: String = jedis.get("order_info:" + orderDetail.order_id) if (orderInfoStr != null) { val detail = new SaleDetail(JSON.parseObject(orderInfoStr, classOf[OrderInfo]), orderDetail) saleDetails.append(detail) } else { //說明當前Order_detail 早來了,快取中找不到對於的Order_info,需要將當前的order-detail寫入redis jedis.sadd("order_detail:" + orderDetail.order_id, gson.toJson(orderDetail)) jedis.expire("order_detail:" + orderDetail.order_id, 5 * 2 * 60) } } } } jedis.close() saleDetails.iterator }) // 根據user_id查詢 使用者的其他資訊 val ds7: DStream[SaleDetail] = ds6.mapPartitions(partition => { val jedis: Jedis = RedisUtil.getJedisClient() val saleDetailsWithUserInfo: Iterator[SaleDetail] = partition.map(saleDetail => { val userInfoStr: String = jedis.get("user_id:" + saleDetail.user_id) val userInfo: UserInfo = JSON.parseObject(userInfoStr, classOf[UserInfo]) saleDetail.mergeUserInfo(userInfo) saleDetail }) jedis.close() saleDetailsWithUserInfo }) //寫入ES 將DS轉換為 docList: List[(String, Any)] val ds8: DStream[(String, SaleDetail)] = ds7.map(saleDetail => ((saleDetail.order_detail_id, saleDetail))) ds8.foreachRDD(rdd => { rdd.foreachPartition(partition => { MyEsUtil.insertBulk("gmall_sale_detail" + LocalDate.now() , partition.toList) }) }) } } }
2)在gmall_realtime模組新建UserInfoApp
package com.yuange.realtime.app import com.alibaba.fastjson.JSON import com.yuange.constants.Constants import com.yuange.realtime.utils.{MyKafkaUtil, RedisUtil} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.dstream.InputDStream /** * @作者:袁哥 * @時間:2021/7/13 20:11 */ object UserInfoApp extends BaseApp { override var appName: String = "UserInfoApp" override var duration: Int = 10 def main(args: Array[String]): Unit = { run{ val ds: InputDStream[ConsumerRecord[String,String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_USER_INFO,streamingContext) ds.foreachRDD(rdd => { rdd.foreachPartition(partiton => { //獲取連線 val jedis = RedisUtil.getJedisClient() partiton.foreach(record => { val key: String = JSON.parseObject(record.value()).getString("id") jedis.set("user_id:" + key, record.value()) }) jedis.close() }) }) } } }
3)在gmall_realtime模組中的MyEsUtil 中給items做一個判空處理
package com.yuange.realtime.utils import java.util.Objects import java.util import io.searchbox.client.config.HttpClientConfig import io.searchbox.client.{JestClient, JestClientFactory} import io.searchbox.core.{Bulk, BulkResult, Index} import collection.JavaConverters._ /** * @作者:袁哥 * @時間:2021/7/9 21:16 */ object MyEsUtil { private val ES_HOST = "http://hadoop102" private val ES_HTTP_PORT = 9200 private var factory: JestClientFactory = null /** * 獲取客戶端 * * @return jestclient */ def getClient: JestClient = { if (factory == null) build() factory.getObject } /** * 關閉客戶端 */ def close(client: JestClient): Unit = { if (!Objects.isNull(client)) try client.shutdownClient() catch { case e: Exception => e.printStackTrace() } } /** * 建立連線 */ private def build(): Unit = { factory = new JestClientFactory factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true) .maxTotalConnection(20) //連線總數 .connTimeout(10000).readTimeout(10000).build) } /* 批量插入資料到ES 需要先將寫入的資料,封裝為 docList: List[(String, Any)] (String, Any): K:id V: document */ def insertBulk(indexName: String, docList: List[(String, Any)]): Unit = { if (docList.size > 0) { val jest: JestClient = getClient val bulkBuilder = new Bulk.Builder().defaultIndex(indexName).defaultType("_doc") for ((id, doc) <- docList) { val indexBuilder = new Index.Builder(doc) if (id != null) { indexBuilder.id(id) } val index: Index = indexBuilder.build() bulkBuilder.addAction(index) } val bulk: Bulk = bulkBuilder.build() var items: util.List[BulkResult#BulkResultItem] = null try { items = jest.execute(bulk).getItems } catch { case ex: Exception => println(ex.toString) } finally { //自動關閉連線 close(jest) if (items != null){ println("儲存" + items.size() + "條資料") /* items: 是一個java的集合 <- 只能用來遍歷scala的集合 將items,由Java的集合轉換為scala的集合 java集合.asScala 由scala集合轉java集合 scala集合.asJava */ for (item <- items.asScala) { if (item.error != null && item.error.nonEmpty) { println(item.error) println(item.errorReason) } } } } } } }
4.3ES索引建立
PUT _template/gmall_sale_detail_template { "index_patterns": ["gmall_sale_detail*"], "settings": { "number_of_shards": 3 }, "aliases" : { "{index}-query": {}, "gmall_sale_detail-query":{} }, "mappings" : { "_doc" : { "properties" : { "order_detail_id" : { "type" : "keyword" }, "order_id" : { "type" : "keyword" }, "create_time" : { "type" : "date" , "format" : "yyyy-MM-dd HH:mm:ss" }, "dt" : { "type" : "date" }, "order_status" : { "type" : "keyword" }, "sku_id" : { "type" : "keyword" }, "sku_name" : { "type" : "text", "analyzer": "ik_max_word" }, "sku_price" : { "type" : "float" }, "user_age" : { "type" : "long" }, "user_gender" : { "type" : "keyword" }, "user_id" : { "type" : "keyword" }, "user_level" : { "type" : "keyword", "index" : false } } } } }
4.4測試
1)啟動Zookeeper
zookeeper.sh start
2)啟動Kafka
kafka.sh start
3)啟動canal
/opt/module/canal/bin/startup.sh
4)啟動gmall_canalclient模組中的MyClient類的main方法,將資料從mysql傳輸至kafka
5)啟動ES叢集
elasticsearch.sh start
6)啟動Redis
redis-server /opt/module/redis/redis.conf
7)使用儲存過程模擬生成資料
#CALL `init_data`(造資料的日期, 生成的訂單數, 生成的使用者數, 是否覆蓋寫) call `init_data`('2021-07-07',6,3,true)
8)啟動gmall_realtime模組的UserInfoApp中的main方法,將kafka中的使用者資料傳入Redis
9)啟動gmall_realtime模組的OrderDetailApp中的main方法,將資料Join之後存入ES
10)檢視Redis中的資料
11)檢視ES中的資料
第5章 靈活查詢資料介面開發
5.1傳入路徑及引數
http://localhost:8070/sale_detail?date=2020-08-21&startpage=1&size=5&keyword=小米手機
5.2返回值
{"total":62,"stat":[{"options":[{"name":"20歲以下","value":0.0},{"name":"20歲到30歲","value":25.8},{"name":"30歲及30歲以上","value":74.2}],"title":"使用者年齡佔比"},{"options":[{"name":"男","value":38.7},{"name":"女","value":61.3}],"title":"使用者性別佔比"}],"detail":[{"user_id":"9","sku_id":"8","user_gender":"M","user_age":49.0,"user_level":"1","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":6.0,"order_count":2.0,"order_amount":53400.0,"dt":"2019-02-14","es_metadata_id":"wPdM7GgBQMmfy2BJr4YT"},{"user_id":"5","sku_id":"8","user_gender":"F","user_age":36.0,"user_level":"4","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":5.0,"order_count":1.0,"order_amount":44500.0,"dt":"2019-02-14","es_metadata_id":"wvdM7GgBQMmfy2BJr4YT"},{"user_id":"19","sku_id":"8","user_gender":"F","user_age":43.0,"user_level":"5","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":7.0,"order_count":2.0,"order_amount":62300.0,"dt":"2019-02-14","es_metadata_id":"xvdM7GgBQMmfy2BJr4YU"},{"user_id":"15","sku_id":"8","user_gender":"M","user_age":66.0,"user_level":"4","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手機","sku_category2_name":"手機通訊","sku_category3_name":"手機","spu_id":"1","sku_num":3.0,"order_count":1.0,"order_amount":26700.0,"dt":"2019-02-14","es_metadata_id":"xvdM7GgBQMmfy2BJr4YU"}]}
5.3編寫DSL語句
GET gmall_sale_detail-query/_search { "query": { "bool": { "filter": { "term": { "dt": "2021-07-07" } }, "must": [ {"match":{ "sku_name": { "query": "小米手機", "operator": "and" } } } ] } } , "aggs": { "groupby_age": { "terms": { "field": "user_age" } }, "groupby_gender": { "terms": { "field": "user_gender" } } }, "from": 0, "size": 2 }
5.4程式碼開發
5.4.1 程式碼清單
bean |
Stat |
餅圖 |
Option |
餅圖中的選項 |
|
控制層 |
PublisherController |
增加getSaleDetail方法,呼叫服務層方法得到資料並根據web介面和引數組織整理返回值 |
服務層 |
PublisherService |
增加getSaleDetail方法 |
PublisherServiceImpl |
實現getSaleDetail方法,依據DSL語句查詢ElasticSearch |
5.4.2 在gmall_publisher模組中修改pom.xml
<!--- ES依賴包--> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>5.3.3</version> </dependency> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> <version>2.7.8</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
5.4.3在gmall_publisher模組中配置 application.properties
#es spring.elasticsearch.jest.uris=http://hadoop102:9200
5.4.4 Bean
1)新建Option
package com.yuange.gmall.gmall_publisher.beans; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @作者:袁哥 * @時間:2021/7/15 10:45 */ @NoArgsConstructor @AllArgsConstructor @Data public class Option { String name; Double value; }
2)新建Stat
package com.yuange.gmall.gmall_publisher.beans; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/15 10:46 */ @AllArgsConstructor @NoArgsConstructor @Data public class Stat { String title; List<Option> options; }
3)新建SaleDetail
package com.yuange.gmall.gmall_publisher.beans; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @作者:袁哥 * @時間:2021/7/15 10:51 */ @NoArgsConstructor @AllArgsConstructor @Data public class SaleDetail { private String order_detail_id; private String order_id; private String order_status; private String create_time; private String user_id; private String sku_id; private String user_gender; private Integer user_age; private String user_level; private Double sku_price; private String sku_name; private String dt; // 多新增 private String es_metadata_id; }
4)lombok註解說明:
@Data:註解會自動增加getter 和setter方法
@AllArgsConstructor:會自動增加包含全部屬性的建構函式
@NoArgsConstructor:新增無參構造器
5)需要pom.xml增加依賴(我之前已經加入依賴了,若你沒加,就加上)
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
5.4.5ESDao
package com.yuange.gmall.gmall_publisher.dao; import com.alibaba.fastjson.JSONObject; import java.io.IOException; /** * @作者:袁哥 * @時間:2021/7/15 10:53 */ public interface ESDao { JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException; }
5.4.6ESDaoImpl
package com.yuange.gmall.gmall_publisher.dao; import com.alibaba.fastjson.JSONObject; import com.yuange.gmall.gmall_publisher.beans.Option; import com.yuange.gmall.gmall_publisher.beans.SaleDetail; import com.yuange.gmall.gmall_publisher.beans.Stat; import io.searchbox.client.JestClient; import io.searchbox.core.Search; import io.searchbox.core.SearchResult; import io.searchbox.core.search.aggregation.MetricAggregation; import io.searchbox.core.search.aggregation.TermsAggregation; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; import java.io.IOException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/15 10:56 */ @Repository public class ESDaoImpl implements ESDao { @Autowired //從容器中取一個JestClient型別的物件 private JestClient jestClient; /* 從ES中查詢出資料 date : 指定查詢的index 名稱 gmall2020_sale_detail + date keyword: 全文檢索的關鍵字 startpage: 計算from : (N - 1) size 1頁 10 條資料。 startpage: 1 from: 0 size: 10 startpage: 2 from: 10 size 10 startpage: N from: (N - 1) size startpage: 3 from: 20 , size : 10 size: 查詢返回的資料條數 查詢條件: GET /gmall2020_sale_detail-query/_search { "query": { "match": { "sku_name": "手機" } }, "from": 0, "size": 20, "aggs": { "genderCount": { "terms": { "field": "user_gender", "size": 10 } }, "ageCount": { "terms": { "field": "user_age", "size": 150 } } } } */ public SearchResult getDataFromES(String date, Integer startpage, Integer size, String keyword) throws IOException { String indexName ="gmall_sale_detail" + date; int from = (startpage - 1 ) * size; // genderCount:{} TermsBuilder aggs1 = AggregationBuilders.terms("genderCount").field("user_gender").size(10); // "ageCount":{} TermsBuilder aggs2 = AggregationBuilders.terms("ageCount").field("user_age").size(150); // query":{} MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("sku_name", keyword); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(matchQueryBuilder).aggregation(aggs1).aggregation(aggs2); //生成查詢的字串 Search search = new Search.Builder(searchSourceBuilder.toString()).addType("_doc").addIndex(indexName).build(); SearchResult searchResult = jestClient.execute(search); return searchResult; } // 將ES中查詢的資料,按照指定的格式,封裝 detail資料 public List<SaleDetail> getDetailData(SearchResult searchResult){ ArrayList<SaleDetail> saleDetails = new ArrayList<>(); List<SearchResult.Hit<SaleDetail, Void>> hits = searchResult.getHits(SaleDetail.class); for (SearchResult.Hit<SaleDetail, Void> hit : hits) { SaleDetail saleDetail = hit.source; saleDetail.setEs_metadata_id(hit.id); saleDetails.add(saleDetail); } return saleDetails; } // 將ES中查詢的資料,按照指定的格式,封裝 age相關的 stat資料 public Stat getAgeStat(SearchResult searchResult){ Stat stat = new Stat(); MetricAggregation aggregations = searchResult.getAggregations(); TermsAggregation ageCount = aggregations.getTermsAggregation("ageCount"); List<TermsAggregation.Entry> buckets = ageCount.getBuckets(); int agelt20=0; int agege30=0; int age20to30=0; double sumCount=0; for (TermsAggregation.Entry bucket : buckets) { if (Integer.parseInt(bucket.getKey()) < 20 ){ agelt20 += bucket.getCount(); }else if(Integer.parseInt(bucket.getKey()) >= 30){ agege30 += bucket.getCount(); }else{ age20to30+=bucket.getCount(); } } sumCount = age20to30 + agege30 + agelt20; DecimalFormat format = new DecimalFormat("###.00"); List<Option> ageoptions =new ArrayList<>(); double perlt20 = agelt20 / sumCount * 100; double per20to30 = age20to30 / sumCount * 100; ageoptions.add(new Option("20歲以下",Double.parseDouble(format.format(perlt20 )))); ageoptions.add(new Option("20歲到30歲",Double.parseDouble(format.format( per20to30)))); ageoptions.add(new Option("30歲及30歲以上",Double.parseDouble(format.format(100 - perlt20 - per20to30 )))); stat.setOptions(ageoptions); stat.setTitle("使用者年齡佔比"); return stat; } public Stat getGenderStat(SearchResult searchResult){ Stat stat = new Stat(); MetricAggregation aggregations = searchResult.getAggregations(); TermsAggregation ageCount = aggregations.getTermsAggregation("genderCount"); List<TermsAggregation.Entry> buckets = ageCount.getBuckets(); int maleCount=0; int femaleCount=0; double sumCount=0; for (TermsAggregation.Entry bucket : buckets) { if (bucket.getKey().equals("F") ){ femaleCount += bucket.getCount(); }else{ maleCount += bucket.getCount(); } } sumCount = maleCount + femaleCount; DecimalFormat format = new DecimalFormat("###.00"); List<Option> ageoptions =new ArrayList<>(); ageoptions.add(new Option("男",Double.parseDouble(format.format(maleCount / sumCount * 100 )))); ageoptions.add(new Option("女",Double.parseDouble(format.format( (1 - maleCount / sumCount ) * 100 )))); stat.setOptions(ageoptions); stat.setTitle("使用者性別佔比"); return stat; } // 將ES中查詢的資料,按照指定的格式,封裝 gender相關的 stat資料 @Override public JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException { SearchResult searchResult = getDataFromES(date, startpage, size, keyword); List<SaleDetail> detailData = getDetailData(searchResult); Stat ageStat = getAgeStat(searchResult); Stat genderStat = getGenderStat(searchResult); JSONObject jsonObject = new JSONObject(); jsonObject.put("total",searchResult.getTotal()); ArrayList<Stat> stats = new ArrayList<>(); stats.add(ageStat); stats.add(genderStat); jsonObject.put("stat",stats); jsonObject.put("detail",detailData); return jsonObject; } }
5.4.7PublisherService
JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException;
5.4.8PublisherServiceImpl
@Autowired private ESDao esDao; @Override public JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException { return esDao.getESData(date,startpage,size,keyword); }
5.4.9 PublisherController
/* http://localhost:8070/sale_detail?date=2020-08-21&startpage=1&size=5&keyword=小米手機 */ @RequestMapping(value = "/sale_detail") public JSONObject handle3(String date,Integer startpage,Integer size,String keyword) throws IOException { return publisherService.getESData(date,startpage,size,keyword); }
5.4.10index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <a href="/realtime-total?date=2021-07-06">統計每日日活和新增裝置</a> <br/> <a href="/realtime-hours?id=dau&date=2021-07-06">統計昨天和今天的分時DAU資料</a> <br/> <a href="/realtime-hours?id=order_amount&date=2021-07-07">統計昨天和今天的分時GMV資料</a> <br/> <a href="/sale_detail?date=2021-07-15&startpage=1&size=10&keyword=手機">請求購物明細明細</a> </body> </html>
5.4.11啟動gmall_publisher模組
5.4.12 測試:
1)先在ES中查詢是否有2021-07-15的資料,若沒有,下面的操作會返回500錯誤碼
2)訪問:http://localhost:8070/
3)完整實時專案程式碼已上傳至GitHub:https://github.com/LzMingYueShanPao/gmall_sparkstream.git