實時分析系統--交易額需求
第1章 採集資料
1.1 框架流程
1.2 Canal 入門
1.2.1 什麼是 Canal
由於Canal沒有官網,所以可以認為它託管在github上的專案就是官網,所以地址是:https://github.com/alibaba/canal
1.2.2 使用場景
1)原始場景: 阿里Otter中介軟體的一部分,Otter是阿里用於進行異地資料庫之間的同步框架,Canal是其中一部分。
2) 常見場景1:更新快取
3)場景2:抓取業務資料新增變化表,用於製作拉鍊表。
4)場景3:抓取業務表的新增變化資料,用於製作實時統計。
1.2.3Canal的工作原理
複製過程分成三步:
1)Master主庫將改變記錄寫到二進位制日誌(binary log)中;
2)Slave從庫向mysql master傳送dump協議,將master主庫的binary log events拷貝到它的中繼日誌(relay log);
3)Slave從庫讀取並重做中繼日誌中的事件,將改變的資料同步到自己的資料庫。
Canal的工作原理很簡單,就是把自己偽裝成Slave,假裝從Master複製資料:
1)canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 傳送dump 協議
2)MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
3)canal 解析 binary log 物件(原始為 byte 流)
1.2.4 MySQL的Binlog
1.2.4.1 什麼是Binlog
MySQL的二進位制日誌可以說是MySQL最重要的日誌了,它記錄了所有的DDL和DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進位制日誌是事務安全型的。
一般來說開啟二進位制日誌大概會有1%的效能損耗。二進位制有兩個最重要的使用場景:
1)MySQL Replication在Master端開啟binlog,Mster把它的二進位制日誌傳遞給slaves來達到master-slave資料一致的目的。
2)自然就是資料恢復了,通過使用MySQLBinlog工具來使恢復資料。
二進位制日誌包括兩類檔案:
1)二進位制日誌索引檔案(檔名字尾為.index):用於記錄所有的二進位制檔案
2)二進位制日誌檔案(檔名字尾為.00000*):記錄資料庫所有的DDL和DML(除了資料查詢語句)語句事件。
1.2.4.2 Binlog的開啟
在MySQL的配置檔案(Linux: /etc/my.cnf , Windows: \my.ini)下,修改配置在[mysqld] 區塊設定/新增
#Binlog日誌的開啟
log_bin=mysql-bin
這個表示binlog日誌的字首是mysql-bin,以後生成的日誌檔案就是mysql-bin.123456 的檔案後面的數字按順序生成。每次mysql重啟或者到達單個檔案大小的閾值時,新生一個檔案,按順序編號
1.2.4.3 Binlog的分類設定
MySQLBinlog的格式,那就是有三種,分別是statement、mixed、row
在配置檔案中選擇配置binlog_format屬性
#選擇Binlog的格式 binlog_format=row #保證server-id是唯一的 server-id=1
區別:
1)statement
語句級,binlog會記錄每次一執行寫操作的語句,相對row模式節省空間,但是可能產生不一致性,比如:update tt set create_date=now(),如果用binlog日誌進行恢復,由於執行時間不同可能產生的資料就不同。
優點:節省空間
缺點:有可能造成資料不一致。
2)row
行級,binlog會記錄每次操作後每行記錄的變化。
優點:保持資料的絕對一致性。因為不管sql是什麼,引用了什麼函式,他只記錄執行後的效果。
缺點:佔用較大空間。
3)mixed
statement的升級版,一定程度上解決了,因為一些情況而造成的statement模式不一致問題,在某些情況下譬如:當函式中包含 UUID() 時,包含 AUTO_INCREMENT 欄位的表被更新時,執行 INSERT DELAYED 語句時,用 UDF 時,會按照 ROW的方式進行處理
優點:節省空間,同時兼顧了一定的一致性。
缺點:還有些極個別情況依舊會造成不一致,
另外statement和mixed對於需要對binlog的監控的情況都不方便
1.3MySQL的準備
1.3.1 匯入模擬業務資料庫
1.3.2 賦許可權
1)進入mysql客戶端
mysql -uroot -proot123
2)更改mysql密碼策略
set global validate_password_length=4;
set global validate_password_policy=0;
3)在mysql中執行如下語句,建立canal使用者,密碼為canal:
grant select, replication slave, replication client on *.* to 'canal'@'%' identified by 'canal';
4)建立gmall_realtime資料庫
5)執行gmall.sql檔案(下載連結是下方的網盤地址,請自行下載),執行過程中若出現如下錯誤,則開啟該sql檔案,修改庫名為gmall_realtime
6)儲存過程,模擬資料(執行sql指令碼時已經建立,直接使用即可)
#CALL `init_data`(造資料的日期, 生成的訂單數, 生成的使用者數, 是否覆蓋寫) call `init_data`('2021-07-07',17,5,false)
1.3.3 修改/etc/my.cnf檔案
sudo vim /etc/my.cnf
#Binlog日誌的開啟 log_bin=mysql-bin #選擇Binlog的格式 binlog_format=row #保證server-id是唯一的 server-id=1 #只記錄哪些庫的寫操作 binlog-do-db=gmall_realtime
1.3.4 重啟MySql並檢視狀態
sudo systemctl restart mysqld
sudo systemctl status mysqld
1.4Canal 安裝
1.4.1 Canal的下載(我們下載1.1.2版本的即可)
1)偽官網下載地址:https://github.com/alibaba/canal/releases,找到1.1.2版本,下載它
2)我已經下載下來了,網盤連結下載地址:https://pan.baidu.com/s/1p6vh6FOe-0wd4U8tyuKslQ 提取碼:fzbq
3)下載完成後將其上傳至hadoop104機器中的/opt/software/
4)在/opt/module/目錄下建立canal目錄
mkdir /opt/module/canal
5)解壓
tar -zxvf /opt/software/canal.deployer-1.1.2.tar.gz -C /opt/module/canal/
1.4.2 修改canal的配置
1)修改canal.properties
vim /opt/module/canal/conf/canal.properties
#我的canal安裝在hadoop104
canal.ip = hadoop104
#這個檔案是canal的基本通用配置,主要關心一下埠號,不改的話預設就是11111
canal.port = 11111
2)修改instance.properties(該檔案的作用主要是針對要追蹤的MySQL的例項配置)
vim /opt/module/canal/conf/example/instance.properties
#slaveId不能與mysql中的server-id重複
canal.instance.mysql.slaveId=2
#mysql的地址和埠號,我的mysql安裝在hadoop102,你的安裝在哪就寫哪
canal.instance.master.address=hadoop102:3306
#從binlog的哪個檔案的哪個位置開始同步 需要在主機上執行show master status檢視最新的位置,你執行的結果是啥你就寫啥,不要照抄我的
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=88546
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
1.4.3 啟動canal
/opt/module/canal/bin/startup.sh
1.4.4 停止canal
/opt/module/canal/bin/stop
1.5資料監控模組---抓取訂單資料
1.5.1 建立gmall_canalclient模組
1.5.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>gmall_sparkstream</artifactId> <groupId>com.yuange</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>gmall_canalclient</artifactId> <dependencies> <dependency> <groupId>com.yuange</groupId> <artifactId>gmall_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies> </project>
1.5.3通用監視類
1)Canal封裝的資料結構
2)建立生產者,MyProducer.java
package com.yuange.canal; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @作者:袁哥 * @時間:2021/7/7 18:54 */ public class MyProducer { private static Producer<String,String> producer; static { producer=getProducer(); } // 提供方法返回生產者 public static Producer<String,String> getProducer(){ Properties properties = new Properties(); //參考 ProducerConfig properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); } //傳送資料至kafka public static void sendDataToKafka(String topic,String msg){ producer.send(new ProducerRecord<String, String>(topic,msg)); } }
3)監控Mysql,MyClient.java
package com.yuange.canal; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.yuange.constants.Constants; import java.net.InetSocketAddress; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/7 18:10 * 步驟: * ①建立一個客戶端:CanalConnector(SimpleCanalConnector: 單節點canal叢集、ClusterCanalConnector: HA canal叢集) * ②使用客戶端連線 canal server * ③指定客戶端訂閱 canal server中的binlog資訊 * ④解析binlog資訊 * ⑤寫入kafka * * 消費到的資料的結構: * Message: 代表拉取的一批資料,這一批資料可能是多個SQL執行,造成的寫操作變化 * List<Entry> entries : 每個Entry代表一個SQL造成的寫操作變化 * id : -1 說明沒有拉取到資料 * Entry: * CanalEntry.Header header_ : 頭資訊,其中包含了對這條sql的一些說明 * private Object tableName_: sql操作的表名 * EntryType; Entry操作的型別 * 開啟事務: 寫操作 begin * 提交事務: 寫操作 commit * 對原始資料進行影響的寫操作: rowdata * update、delete、insert * ByteString storeValue_: 資料 * 序列化資料,需要使用工具類RowChange,進行轉換,轉換之後獲取到一個RowChange物件 */ public class MyClient { public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { /** * 建立一個canal客戶端 * public static CanalConnector newSingleConnector( * SocketAddress address, //指定canal server的主機名和埠號 * tring destination, //參考/opt/module/canal/conf/canal.properties中的canal.destinations 屬性值 * String username, //不是instance.properties中的canal.instance.dbUsername * String password //參考AdminGuide(從canal 1.1.4 之後才提供的),連結地址:https://github.com/alibaba/canal/wiki/AdminGuide * ) {...} * */ CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("haddoop104", 11111), "example", "", ""); //使用客戶端連線 canal server canalConnector.connect(); //指定客戶端訂閱 canal server中的binlog資訊 只統計在Order_info表 canalConnector.subscribe("gmall_realtime.order_info"); //不停地拉取資料 Message[id=-1,entries=[],raw=false,rawEntries=[]] 代表當前這批沒有拉取到資料 while (true){ Message message = canalConnector.get(100); //判斷是否拉取到了資料,如果沒有拉取到,歇一會再去拉取 if (message.getId() == -1){ System.out.println("暫時沒有資料,先等會"); Thread.sleep(5000); continue; } // 資料的處理邏輯 List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { //判斷這個entry的型別是不是rowdata型別,只處理rowdata型別 if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){ ByteString storeValue = entry.getStoreValue(); //資料 String tableName = entry.getHeader().getTableName(); //表名 handleStoreValue(storeValue,tableName); } } } } private static void handleStoreValue(ByteString storeValue, String tableName) throws InvalidProtocolBufferException { //將storeValue 轉化為 RowChange CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); /** * 一個RowChange代表多行資料 * order_info: 可能會執行的寫操作型別,統計GMV total_amount * insert : 會-->更新後的值 * update : 不會-->只允許修改 order_status * delete : 不會,資料是不允許刪除 * 判斷當前這批寫操作產生的資料是不是insert語句產生的 * */ if (rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){ //獲取行的集合 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList){ JSONObject jsonObject = new JSONObject(); //獲取insert後每一行的每一列 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { jsonObject.put(column.getName(),column.getValue()); } //傳送資料至Kafka,獲取列名和列值 MyProducer.sendDataToKafka(Constants.GMALL_ORDER_INFO, jsonObject.toJSONString()); } } } }
4)新增log4j.properties
# Set everything to be logged to the console log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
1.5.4 測試
1)啟動MyClient中的main方法
2)使用儲存過程模擬業務資料
call `init_data`('2021-07-07',15,3,false)
3)檢視Idea控制檯,發現已經成功同步了資料
4)檢視kafka中的GMALL_ORDER_INFO 主題
第2章 實時處理
2.1Phoenix建表
sqlline.py hadoop103:2181
create table gmall_order_info ( id varchar primary key, province_id varchar, consignee varchar, order_comment varchar, consignee_tel varchar, order_status varchar, payment_way varchar, user_id varchar, img_url varchar, total_amount double, expire_time varchar, delivery_address varchar, create_time varchar, operate_time varchar, tracking_no varchar, parent_order_id varchar, out_trade_no varchar, trade_body varchar, create_date varchar, create_hour varchar);
2.2在gmall_realtime中新建樣例類,OrderInfo
package com.yuange.realtime.beans /** * @作者:袁哥 * @時間:2021/7/7 20:41 */ case class OrderInfo( id: String, province_id: String, consignee: String, order_comment: String, var consignee_tel: String, order_status: String, payment_way: String, user_id: String, img_url: String, total_amount: Double, expire_time: String, delivery_address: String, create_time: String, operate_time: String, tracking_no: String, parent_order_id: String, out_trade_no: String, trade_body: String, // 方便分時和每日統計,額外新增的欄位 var create_date: String, var create_hour: String)
2.3 SparkStreaming消費kafka並儲存到HBase中
package com.yuange.realtime.app import java.time.LocalDateTime import java.time.format.DateTimeFormatter import com.alibaba.fastjson.JSON import com.yuange.constants.Constants import com.yuange.realtime.beans.OrderInfo import com.yuange.realtime.utils.MyKafkaUtil import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.phoenix.spark._ /** * @作者:袁哥 * @時間:2021/7/7 20:45 */ object GMVApp extends BaseApp { override var appName: String = "GMVApp" override var duration: Int = 10 def main(args: Array[String]): Unit = { val ds: InputDStream[ConsumerRecord[String,String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_INFO,streamingContext) //將kafka中的資料封裝為樣例類 val ds1: DStream[OrderInfo] = ds.map(record => { val orderInfo: OrderInfo = JSON.parseObject(record.value(),classOf[OrderInfo]) // 封裝create_date 和 create_hour "create_time":"2021-07-07 01:39:33" val formatter1: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") val formatter2: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") val time: LocalDateTime = LocalDateTime.parse(orderInfo.create_date,formatter1) orderInfo.create_date = time.format(formatter2) orderInfo.create_hour = time.getHour.toString // 訂單的明細資料,脫敏 演示手機號脫敏 orderInfo.consignee_tel = orderInfo.consignee_tel.replaceAll("(\\w{3})\\w*(\\w{4})", "$1****$2") orderInfo }) //寫入hbase ds1.foreachRDD(rdd => { rdd.saveToPhoenix( "GMALL_ORDER_INFO", Seq("ID","PROVINCE_ID", "CONSIGNEE", "ORDER_COMMENT", "CONSIGNEE_TEL", "ORDER_STATUS", "PAYMENT_WAY", "USER_ID","IMG_URL", "TOTAL_AMOUNT", "EXPIRE_TIME", "DELIVERY_ADDRESS", "CREATE_TIME","OPERATE_TIME","TRACKING_NO","PARENT_ORDER_ID","OUT_TRADE_NO", "TRADE_BODY", "CREATE_DATE", "CREATE_HOUR"), HBaseConfiguration.create(), Some("hadoop103:2181") ) }) } }
2.4測試
1)啟動gmall_canalclient模組中的MyClient中的main方法
2)啟動gmall_realtime模組中的GMVApp中的main方法
3)使用儲存過程模擬業務資料
call `init_data`('2021-07-07',13,2,false)
4)檢視Idea控制檯
5)使用phoenix檢視GMALL_ORDER_INFO表中是否有資料
select * from GMALL_ORDER_INFO limit 10;
第3章資料介面釋出
3.1程式碼清單
控制層 |
PublisherController |
實現介面的web釋出 |
服務層 |
PublisherService |
資料業務查詢interface |
PublisherServiceImpl |
業務查詢的實現類 |
|
資料層 |
OrderMapper |
資料層查詢的interface |
OrderMapper.xml |
資料層查詢的實現配置 |
3.2 介面
3.2.1 訪問路徑
總數 |
http://localhost:8070/realtime-total?date=2020-08-18 |
分時統計 |
http://localhost:8070/realtime-hours?id=order_amount&date=2020-08-18 |
3.2.2要求資料格式
總數 |
[{"id":"dau","name":"新增日活","value":1200}, {"id":"new_mid","name":"新增裝置","value":233 }, {"id":"order_amount","name":"新增交易額","value":1000.2 }] |
分時統計 |
{"yesterday":{"11":383,"12":123,"17":88,"19":200 }, "today":{"12":38,"13":1233,"17":123,"19":688 }} |
3.3程式碼開發(gmall_publisher模組中操作)
3.3.1 beans層,新建GMVData.java
package com.yuange.gmall.gmall_publisher.beans; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @作者:袁哥 * @時間:2021/7/7 22:22 */ @AllArgsConstructor @NoArgsConstructor @Data public class GMVData { private String hour; private Double amount; }
3.3.2 mapper層,修改PublisherMapper介面,新增如下抽象方法
//查詢每天的總交易額 Double getGMVByDate(String date); //查詢分時交易額 List<GMVData> getGMVDatasByDate(String date);
3.3.3 service層
1)修改PublisherService介面,新增如下抽象方法
//查詢每天的總交易額 Double getGMVByDate(String date); //查詢分時交易額 List<GMVData> getGMVDatasByDate(String date);
2)修改PublisherServiceImpl實現類,新增如下實現
@Override public Double getGMVByDate(String date) { return publisherMapper.getGMVByDate(date); } @Override public List<GMVData> getGMVDatasByDate(String date) { return publisherMapper.getGMVDatasByDate(date); }
3.3.4controller層
package com.yuange.gmall.gmall_publisher.controller; import com.alibaba.fastjson.JSONObject; import com.yuange.gmall.gmall_publisher.beans.DAUData; import com.yuange.gmall.gmall_publisher.beans.GMVData; import com.yuange.gmall.gmall_publisher.service.PublisherService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDate; import java.util.ArrayList; import java.util.List; /** * @作者:袁哥 * @時間:2021/7/6 23:10 */ @RestController public class GmallPublisherController { @Autowired private PublisherService publisherService; //http://localhost:8070/realtime-total?date=2021-07-06 @RequestMapping(value = "/realtime-total") public Object handle1(String date){ ArrayList<JSONObject> result = new ArrayList<>(); Integer dau = publisherService.getDAUByDate(date); Integer newMidCounts = publisherService.getNewMidCountByDate(date); Double gmv = publisherService.getGMVByDate(date); JSONObject jsonObject1 = new JSONObject(); jsonObject1.put("id","dau"); jsonObject1.put("name","新增日活"); jsonObject1.put("value",dau); JSONObject jsonObject2 = new JSONObject(); jsonObject2.put("id","new_mid"); jsonObject2.put("name","新增裝置"); jsonObject2.put("value",newMidCounts); JSONObject jsonObject3 = new JSONObject(); jsonObject3.put("id","order_amount"); jsonObject3.put("name","新增交易額"); jsonObject3.put("value",gmv); result.add(jsonObject1); result.add(jsonObject2); result.add(jsonObject3); return result; } @RequestMapping(value = "/realtime-hours") public Object handle2(String id,String date){ //根據今天求昨天的日期 LocalDate toDay = LocalDate.parse(date); String yestodayDate = toDay.minusDays(1).toString(); JSONObject result = new JSONObject(); if ("dau".equals(id)){ List<DAUData> todayDatas = publisherService.getDAUDatasByDate(date); List<DAUData> yestodayDatas = publisherService.getDAUDatasByDate(yestodayDate); JSONObject jsonObject1 = parseData(todayDatas); JSONObject jsonObject2 = parseData(yestodayDatas); result.put("yesterday",jsonObject2); result.put("today",jsonObject1); }else{ List<GMVData> todayDatas = publisherService.getGMVDatasByDate(date); List<GMVData> yestodayDatas = publisherService.getGMVDatasByDate(yestodayDate); JSONObject jsonObject1 = parseGMVData(todayDatas); JSONObject jsonObject2 = parseGMVData(yestodayDatas); result.put("yesterday",jsonObject2); result.put("today",jsonObject1); } return result; } public JSONObject parseGMVData(List<GMVData> datas){ JSONObject jsonObject = new JSONObject(); for (GMVData data : datas) { jsonObject.put(data.getHour(),data.getAmount()); } return jsonObject; } //負責把 List<DAUData> 封裝為一個JSONObject public JSONObject parseData(List<DAUData> datas){ JSONObject jsonObject = new JSONObject(); for (DAUData data : datas) { jsonObject.put(data.getHour(),data.getNum()); } return jsonObject; } }
3.3.5PublisherMapper.xml新增如下內容
<select id="getGMVByDate" resultType="double"> select sum(total_amount) from GMALL_ORDER_INFO where create_date = #{date} </select> <select id="getGMVDatasByDate" resultType="com.yuange.gmall.gmall_publisher.beans.GMVData" > select create_hour hour,sum(total_amount) amount from GMALL_ORDER_INFO where create_date = #{date} group by create_hour </select>
3.3.6index.html新增如下內容
<br/> <a href="/realtime-hours?id=order_amount&date=2021-07-07">統計昨天和今天的分時GMV資料</a>
3.4 索引優化
create local index IDX_GMALL_ORDER_CREATE_DATE_HOUR on gmallXXX_order_info(create_date,create_hour)