1. 程式人生 > >大資料篇:資料倉庫案例

大資料篇:資料倉庫案例

# 離線資料倉庫 > 資料倉庫(Data WareHouse)是為企業所有決策制定過程,提供所有系統資料支援的戰略集合 > > 通過對資料倉庫中資料的分析,可以幫助企業,改進業務流程、控制、成本、提高產品質量等 > > 資料倉庫,並不是資料最終目的地,而是為資料最終的目的地做好準備:清洗、轉義、分類、重組、合併、拆分、統計等等 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420141825710-1523728172.png) ## 1 專案簡介 ### 1.1 專案需求 1. 使用者行為資料採集平臺搭建 2. 業務資料採集平臺搭建 3. 資料倉庫維度建模 4. 分析:使用者、流量、會員、商品、銷售、地區、活動等主題 5. 採用即席查詢工具,隨時進行指標分析 6. 對叢集效能進行監控,發生異常需要報警 7. 元資料管理 8. 質量監控 ### 1.2 技術選型 - 資料採集功能如何技術選型 | 採集框架名稱 | 主要功能 | | :--------------------------: | ---------------------------------- | | Sqoop | 大資料平臺和關係型資料庫的匯入匯出 | | Datax | 大資料平臺和關係型資料庫的匯入匯出 | | flume | 擅長日誌資料的採集和解析 | | logstash | 擅長日誌資料的採集和解析 | | maxwell | 常用作實時解析mysql的binlog資料 | | canal | 常用作實時解析mysql的binlog資料 | | waterDrop | 資料匯入匯出工具 | - 訊息中介軟體的技術選型 | **開源MQ** | **概述** | | :--------------------------: | ------------------------------------------------------------ | | RabbitMQ | LShift 用Erlang實現,支援多協議,broker架構,重量級 | | ZeroMQ | AMQP最初設計者iMatix公司實現,輕量訊息核心,無broker設計。C++實現 | | Kafka | LinkedIn用Scala語言實現,支援hadoop資料並行載入 | | ActiveMQ | Apach的一種JMS具體實現,支援代理和p2p部署。支援多協議。Java實現 | | Redis | Key-value NoSQL資料庫,有MQ的功能 | | MemcacheQ | 國人利用memcache緩衝佇列協議開發的訊息佇列,C/C++實現 | - 資料永久儲存技術框架選型 | 框架名稱 | 主要用途 | | :--------------------------: | ------------------------------------------- | | HDFS | 分散式檔案儲存系統 | | Hbase | Key,value對的nosql資料庫 | | Kudu | Cloudera公司開源提供的類似於Hbase的資料儲存 | | Hive | 基於MR的資料倉庫工具 | - 資料離線計算框架技術選型(hive引擎) | 框架名稱 | 基本介紹 | | :------------------------------: | --------------------------------- | | MapReduce | 最早期的分散式檔案計算系統 | | Spark | 基於spark,一站式解決批流處理問題 | | Flink | 基於flink,一站式解決批流處理問題 | - 分析資料庫選型 | 對比專案 | Druid | Kylin | Presto | Impala | ES | | ----------- | :---: | :--------------------------: | :----: | :----: | :-------: | | 亞秒級響應 | √ | √ | × | × | × | | 百億資料集 | √ | √ | √ | √ | √ | | SQL支援 | √ | √ | √ | √ | √(需外掛) | | 離線 | √ | √ | √ | √ | √ | | 實時 | √ | √ | × | × | × | | 精確去重 | × | √ | √ | √ | × | | 多表Join | × | √ | √ | √ | × | | JDBC for BI | × | √ | √ | √ | × | - 其他選型 - 任務排程:DolphinScheduler - 叢集監控:CM+CDH - 元資料管理:Atlas - BI工具:Zeppelin、Superset ### 1.3 架構 ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517115113885-1905516501.png) ### 1.4 叢集資源規劃 - 如何確認叢集規模(假設每臺伺服器8T磁碟,128G記憶體) 1. 每天日活躍使用者100萬,每人一天平均100條:100萬 * 100條 = 1億條 2. 每條日誌1K左右,每天1一條:1億 / 1024 /1024 = 約100G 3. 半年內不擴容伺服器來算:100G * 180天 = 約18T 4. 儲存3個副本:18T * 3 = 54T 5. 預留20% ~ 30%BUF:54T / 0.7 = 77T 6. 總結:約10臺伺服器 > 由於資源有限,採用3臺進行製作 | 服務名稱 | 子服務 | 伺服器 cdh01.cm | 伺服器 cdh02.cm | 伺服器 cdh03.cm | | ---------------- | ------------------------------------------- | --------------- | --------------- | --------------- | | HDFS | NameNode
DataNode
SecondaryNameNode | √

|

|

√ | | Yarn | NodeManager
Resourcemanager | √
| √
√ | √
| | Zookeeper | Zookeeper Server | √ | √ | √ | | Flume | Flume
Flume(消費 Kafka) |
√ | √
| √
| | Kafka | Kafka | √ | √ | √ | | Hive | Hive | √ | | | | MySQL | MySQL | √ | | | | Sqoop | Sqoop | √ | | | | Presto | Coordinator
Worker | √
|
√ |
√ | | DolphinScheduler | DolphinScheduler | √ | | | | Druid | Druid | √ | √ | √ | | Kylin | Kylin | √ | | | | Hbase | HMaster
HRegionServer | √
√ |
√ |
√ | | Superset | Superset | √ | | | | Atlas | Atlas | √ | | | | Solr | Solr | √ | | | ## 2 資料生成模組 > 此模組主要針對於使用者行為資料的採集,為什麼要進行使用者行為資料的採集呢? > > 因為對於企業來說,使用者就是錢,需要將使用者的習慣等資料進行採集,以便在大資料衍生產品如使用者畫像標籤系統進行分析,那麼一般情況下使用者的資訊都是離線分析的,後期我們可以將分析結果存入ES等倒排索引生態中,在使用實時計算的方式匹配使用者習慣,進行定製化推薦,更進一步的深度學習,對相似使用者進行推薦。 ### 2.1 埋點資料基本格式 - 公共欄位:基本所有安卓手機都包含的欄位 - 業務欄位:埋點上報的欄位,有具體的業務型別 ```ruby { "ap":"xxxxx",//專案資料來源 app pc "cm": { //公共欄位 "mid": "", // (String) 裝置唯一標識 "uid": "", // (String) 使用者標識 "vc": "1", // (String) versionCode,程式版本號 "vn": "1.0", // (String) versionName,程式版本名 "l": "zh", // (String) language 系統語言 "sr": "", // (String) 渠道號,應用從哪個渠道來的。 "os": "7.1.1", // (String) Android 系統版本 "ar": "CN", // (String) area 區域 "md": "BBB100-1", // (String) model 手機型號 "ba": "blackberry", // (String) brand 手機品牌 "sv": "V2.2.1", // (String) sdkVersion "g": "", // (String) gmail "hw": "1620x1080", // (String) heightXwidth,螢幕寬高 "t": "1506047606608", // (String) 客戶端日誌產生時的時間 "nw": "WIFI", // (String) 網路模式 "ln": 0, // (double) lng 經度 "la": 0 // (double) lat 緯度 }, "et": [ //事件 { "ett": "1506047605364", //客戶端事件產生時間 "en": "display", //事件名稱 "kv": { //事件結果,以 key-value 形式自行定義 "goodsid": "236", "action": "1", "extend1": "1", "place": "2", "category": "75" } } ] } ``` - 示例日誌(伺服器時間戳 | 日誌),時間戳可以有效判定網路服務的通訊時長: ```ruby 1540934156385| { "ap": "gmall", //數倉庫名 "cm": { "uid": "1234", "vc": "2", "vn": "1.0", "la": "EN", "sr": "", "os": "7.1.1", "ar": "CN", "md": "BBB100-1", "ba": "blackberry", "sv": "V2.2.1", "g": "[email protected]", "hw": "1620x1080", "t": "1506047606608", "nw": "WIFI", "ln": 0, "la": 0 }, "et": [ { "ett": "1506047605364", //客戶端事件產生時間 "en": "display", //事件名稱 "kv": { //事件結果,以 key-value 形式自行定義 "goodsid": "236", "action": "1", "extend1": "1", "place": "2", "category": "75" } },{ "ett": "1552352626835", "en": "active_background", "kv": { "active_source": "1" } } ] } } ``` ### 2.2 埋點事件日誌資料 #### 2.2.1 商品列表頁 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420171037540-1045875922.png) - 事件名稱:loading | 標籤 | 含義 | | ------------ | ------------------------------------------------------------ | | action | 動作:開始載入=1,載入成功=2,載入失敗=3 | | loading_time | 載入時長:計算下拉開始到介面返回資料的時間,(開始載入報 0,載入成 功或載入失敗才上報時間) | | loading_way | 載入型別:1-讀取快取,2-從介面拉新資料 (載入成功才上報載入型別) | | extend1 | 擴充套件欄位 Extend1 | | extend2 | 擴充套件欄位 Extend2 | | type | 載入型別:自動載入=1,使用者下拽載入=2,底部載入=3(底部條觸發點選底部提示條/點選返回頂部載入) | | type1 | 載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗) | #### 2.2.2 商品點選 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420171429375-958039403.png) - 事件標籤:display | 標籤 | 含義 | | -------- | ---------------------------------------------------- | | action | 動作:曝光商品=1,點選商品=2 | | goodsid | 商品 ID(服務端下發的 ID) | | place | 順序(第幾條商品,第一條為 0,第二條為 1,如此類推) | | extend1 | 曝光型別:1 - 首次曝光 2-重複曝光 | | category | 分類 ID(服務端定義的分類 ID) | #### 2.2.3 商品詳情頁 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420171749329-233847972.png) - 事件標籤:newsdetail | 標籤 | 含義 | | ------------- | ------------------------------------------------------------ | | entry | 頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦=3 | | action | 動作:開始載入=1,載入成功=2(pv),載入失敗=3, 退出頁面=4 | | goodsid | 商品 ID(服務端下發的 ID) | | show_style | 商品樣式:0、無圖、1、一張大圖、2、兩張圖、3、三張小圖、4、一張小圖、 5、一張大圖兩張小圖 | | news_staytime | 頁面停留時長:從商品開始載入時開始計算,到使用者關閉頁面所用的時間。 若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中 途劃出的時間超過 10 分鐘,則本次計時作廢,不上報本次資料。如未載入成 功退出,則報空。 | | loading_time | 載入時長:計算頁面開始載入到介面返回資料的時間 (開始載入報 0,載入 成功或載入失敗才上報時間) | | type1 | 載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗) | | category | 分類 ID(服務端定義的分類 ID) | #### 2.2.4 廣告 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420172340628-1302948439.png) - 事件名稱:ad | 標籤 | 含義 | | ------------ | ------------------------------------------ | | entry | 入口:商品列表頁=1 應用首頁=2 商品詳情頁=3 | | action | 動作: 廣告展示=1 廣告點選=2 | | contentType | Type: 1 商品 2 營銷活動 | | displayMills | 展示時長 毫秒數 | | itemId | 商品 id | | activityId | 營銷活動 id | #### 2.2.5 訊息通知 - 事件標籤:notification | 標籤 | 含義 | | ------- | ------------------------------------------------------------ | | action | 動作:通知產生=1,通知彈出=2,通知點選=3,常駐通知展示(不重複上 報,一天之內只報一次)=4 | | type | 通知 id:預警通知=1,天氣預報(早=2,晚=3),常駐=4 | | ap_time | 客戶端彈出時間 | | content | 備用欄位 | #### 2.2.6 使用者後臺活躍 - 事件標籤: active_background | 標籤 | 含義 | | ------------- | ------------------------------------------- | | active_source | 1=upgrade,2=download(下載),3=plugin_upgrade | #### 2.2.7 評論 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420173031180-38513413.png) - 描述:評論表(comment) | 序號 | 欄位名稱 | 欄位描述 | 欄位型別 | 長度 | 允許空 | 預設值 | | :--: | ------------ | ---------------------------------------------------- | :------: | :--: | :----: | :----: | | 1 | comment_id | 評論表 | int | 10,0 | | | | 2 | userid | 使用者 id | int | 10,0 | √ | 0 | | 3 | p_comment_id | 父級評論 id(為 0 則是
一級評論,不 為 0 則是回覆) | int | 10,0 | √ | | | 4 | content | 評論內容 | string | 1000 | √ | | | 5 | addtime | 建立時間 | string | | √ | | | 6 | other_id | 評論的相關 id | int | 10,0 | √ | | | 7 | praise_count | 點贊數量 | int | 10,0 | √ | 0 | | 8 | reply_count | 回覆數量 | int | 10,0 | √ | 0 | #### 2.2.8 收藏 - 描述:收藏(favorites) | 序號 | 欄位名稱 | 欄位描述 | 欄位型別 | 長度 | 允許空 | 預設值 | | :--: | --------- | -------- | :------: | :--: | :----: | :----: | | 1 | id | 主鍵 | int | 10,0 | | | | 2 | course_id | 商品 id | int | 10,0 | √ | 0 | | 3 | userid | 使用者 ID | int | 10,0 | √ | 0 | | 4 | add_time | 建立時間 | string | | √ | | #### 2.2.9 點贊 ![](https://img2020.cnblogs.com/blog/1235870/202004/1235870-20200420173539976-769679244.png) - 描述:所有的點贊表(praise) | 序號 | 欄位名稱 | 欄位描述 | 欄位型別 | 長度 | 允許空 | 預設值 | | :--: | --------- | ------------------------------------------------------------ | :------: | :--: | :----: | :----: | | 1 | id | 主鍵 id | int | 10,0 | | | | 2 | userid | 使用者 id | int | 10,0 | √ | | | 3 | target_id | 點讚的物件 id | int | 10,0 | √ | | | 4 | type | 建立點贊型別:1問答點贊 2問答評論點贊
3文章點贊數 4評論點贊 | int | 10,0 | √ | | | 5 | add_time | 新增時間 | string | | √ | | #### 2.2.10 錯誤日誌 | **errorBrief** | **錯誤摘要** | | :------------: | :----------: | | **errorBrief** | **錯誤詳情** | ### 2.3 埋點啟動日誌資料 ```ruby { "action":"1", "ar":"MX", "ba":"HTC", "detail":"", "en":"start", "entry":"2", "extend1":"", "g":"[email protected]", "hw":"640*960", "l":"en", "la":"20.4", "ln":"-99.3", "loading_time":"2", "md":"HTC-2", "mid":"995", "nw":"4G", "open_ad_type":"2", "os":"8.1.2", "sr":"B", "sv":"V2.0.6", "t":"1561472502444", "uid":"995", "vc":"10", "vn":"1.3.4" } ``` - 事件標籤: start | 標籤 | 含義 | | ------------ | ------------------------------------------------------------ | | entry | 入 口 : push=1 , widget=2 , icon=3 , notification=4, lockscreen_widget =5 | | open_ad_type | 開屏廣告型別: 開屏原生廣告=1, 開屏插屏廣告=2 | | action | 狀態:成功=1 失敗=2 | | loading_time | 載入時長:計算下拉開始到介面返回資料的時間,(開始載入報 0,載入成 功或載入失敗才上報時間) | | detail | 失敗碼(沒有則上報空) | | extend1 | 失敗的 message(沒有則上報空) | | en | 日誌型別 start | ### 2.4 資料生成指令碼 ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517132739258-1466212620.png) > 如下案例中將省略圖中紅框中的日誌生成過程,直接使用Java程式構建logFile檔案。 #### 2.4.1 資料生成格式 - 啟動日誌 ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517140456698-1080743334.png) > {"action":"1","ar":"MX","ba":"Sumsung","detail":"201","en":"start","entry":"4","extend1":"","g":"[email protected]","hw":"1080*1920","l":"pt","la":"-11.0","ln":"-70.0","loading_time":"9","md":"sumsung-5","mid":"244","nw":"3G","open_ad_type":"1","os":"8.2.3","sr":"D","sv":"V2.1.3","t":"1589612165914","uid":"244","vc":"16","vn":"1.2.1"} - 事件日誌(由於轉換問題,圖中沒有 "時間戳|") ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517140654296-1999187130.png) > 1589695383284|{"cm":{"ln":"-79.4","sv":"V2.5.3","os":"8.0.6","g":"[email protected]","mid":"245","nw":"WIFI","l":"pt","vc":"6","hw":"1080*1920","ar":"MX","uid":"245","t":"1589627025851","la":"-39.6","md":"HTC-7","vn":"1.3.5","ba":"HTC","sr":"N"},"ap":"app","et":[{"ett":"1589650631883","en":"display","kv":{"goodsid":"53","action":"2","extend1":"2","place":"3","category":"50"}},{"ett":"1589690866312","en":"newsdetail","kv":{"entry":"3","goodsid":"54","news_staytime":"1","loading_time":"6","action":"4","showtype":"0","category":"78","type1":""}},{"ett":"1589641734037","en":"loading","kv":{"extend2":"","loading_time":"0","action":"1","extend1":"","type":"2","type1":"201","loading_way":"2"}},{"ett":"1589687684878","en":"ad","kv":{"activityId":"1","displayMills":"92030","entry":"3","action":"5","contentType":"0"}},{"ett":"1589632980772","en":"active_background","kv":{"active_source":"1"}},{"ett":"1589682030324","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1589675065650","en":"comment","kv":{"p_comment_id":2,"addtime":"1589624299628","praise_count":509,"other_id":6,"comment_id":7,"reply_count":35,"userid":3,"content":"關色蘆候佰間綸珊斑禁尹贊滌仇彭企呵姜毅"}},{"ett":"1589631359459","en":"favorites","kv":{"course_id":7,"id":0,"add_time":"1589681240066","userid":7}},{"ett":"1589616574187","en":"praise","kv":{"target_id":1,"id":7,"type":3,"add_time":"1589642497314","userid":8}}]} #### 2.4.2 建立maven工程 ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517135806454-2074956098.png) ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517135851621-1419024208.png) ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517135955865-1425038202.png) ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517140019483-1250393467.png) - data-producer:pom.xml ```java 1.7.20 1.0.7 com.alibaba fastjson 1.2.51 ch.qos.logback logback-core ${logback.version} ch.qos.logback logback-classic ${logback.version} org.projectlombok lombok 1.18.10 provided maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies com.heaton.bigdata.datawarehouse.app.App make-assembly package single ``` - data-producer:logback.xml ```java %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n ${LOG_HOME}/app-%d{yyyy-MM-dd}.log 30 %msg%n 10MB 0 512 ``` - data-flume:pom.xml ```ruby org.apache.flume flume-ng-core 1.9.0 maven-compiler-plugin 2.3.2 1.8 1.8 ``` - hive-function:pom.xml ```java org.apache.hive hive-exec 2.1.1 maven-compiler-plugin 2.3.2 1.8 1.8 ``` #### 2.4.3 各事件bean >data-producer工程 ##### 2.4.3.1 公共日誌類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 公共日誌類 */ @Data public class AppBase { private String mid; // (String) 裝置唯一 private String uid; // (String) 使用者 uid private String vc; // (String) versionCode,程式版本號 private String vn; // (String) versionName,程式版本名 private String l; // (String) 系統語言 private String sr; // (String) 渠道號,應用從哪個渠道來的。 private String os; // (String) Android 系統版本 private String ar; // (String) 區域 private String md; // (String) 手機型號 private String ba; // (String) 手機品牌 private String sv; // (String) sdkVersion private String g; // (String) gmail private String hw; // (String) heightXwidth,螢幕寬高 private String t; // (String) 客戶端日誌產生時的時間 private String nw; // (String) 網路模式 private String ln; // (double) lng 經度 private String la; // (double) lat 緯度 } ``` ##### 2.4.3.2 啟動日誌類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 啟動日誌類 */ @Data public class AppStart extends AppBase { private String entry;//入口: push=1,widget=2,icon=3,notification=4, lockscreen_widget private String open_ad_type;//開屏廣告型別: 開屏原生廣告=1, 開屏插屏廣告=2 private String action;//狀態:成功=1 失敗=2 private String loading_time;//載入時長:計算下拉開始到介面返回資料的時間,(開始載入報 0,載入成功或載入失敗才上報時間) private String detail;//失敗碼(沒有則上報空) private String extend1;//失敗的 message(沒有則上報空) private String en;//啟動日誌型別標記 } ``` ##### 2.4.3.3 錯誤日誌類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 錯誤日誌類 */ @Data public class AppErrorLog { private String errorBrief; //錯誤摘要 private String errorDetail; //錯誤詳情 } ``` ##### 2.4.3.4 商品點選日誌類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 商品點選日誌類 */ @Data public class AppDisplay { private String action;//動作:曝光商品=1,點選商品=2 private String goodsid;//商品 ID(服務端下發的 ID) private String place;//順序(第幾條商品,第一條為 0,第二條為 1,如此類推) private String extend1;//曝光型別:1 - 首次曝光 2-重複曝光(沒有使用) private String category;//分類 ID(服務端定義的分類 ID) } ``` ##### 2.4.3.5 商品詳情類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 商品詳情類 */ @Data public class AppNewsDetail { private String entry;//頁面入口來源:應用首頁=1、push=2、詳情頁相關推薦 private String action;//動作:開始載入=1,載入成功=2(pv),載入失敗=3, 退出頁面=4 private String goodsid;//商品 ID(服務端下發的 ID) private String showtype;//商品樣式:0、無圖 1、一張大圖 2、兩張圖 3、三張小圖 4、一張小 圖 5、一張大圖兩張小圖 來源於詳情頁相關推薦的商品,上報樣式都為 0(因為都是左文右圖) private String news_staytime;//頁面停留時長:從商品開始載入時開始計算,到使用者關閉頁面 所用的時間。若中途用跳轉到其它頁面了,則暫停計時,待回到詳情頁時恢復計時。或中途劃出的時間超 過 10 分鐘,則本次計時作廢,不上報本次資料。如未載入成功退出,則報空。 private String loading_time;//載入時長:計算頁面開始載入到介面返回資料的時間 (開始加 載報 0,載入成功或載入失敗才上報時間) private String type1;//載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗) private String category;//分類 ID(服務端定義的分類 ID) } ``` ##### 2.4.3.6 商品列表類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 商品列表類 */ @Data public class AppLoading { private String action;//動作:開始載入=1,載入成功=2,載入失敗 private String loading_time;//載入時長:計算下拉開始到介面返回資料的時間,(開始載入報 0, 載入成功或載入失敗才上報時間) private String loading_way;//載入型別:1-讀取快取,2-從介面拉新資料 (載入成功才上報加 載型別) private String extend1;//擴充套件欄位 Extend1 private String extend2;//擴充套件欄位 Extend2 private String type;//載入型別:自動載入=1,使用者下拽載入=2,底部載入=3(底部條觸發點選底 部提示條/點選返回頂部載入) private String type1;//載入失敗碼:把載入失敗狀態碼報回來(報空為載入成功,沒有失敗) } ``` ##### 2.4.3.7 廣告類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 廣告類 */ @Data public class AppAd { private String entry;//入口:商品列表頁=1 應用首頁=2 商品詳情頁=3 private String action;//動作: 廣告展示=1 廣告點選=2 private String contentType;//Type: 1 商品 2 營銷活動 private String displayMills;//展示時長 毫秒數 private String itemId; //商品id private String activityId; //營銷活動id } ``` ##### 2.4.3.8 訊息通知日誌類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 訊息通知日誌類 */ @Data public class AppNotification { private String action;//動作:通知產生=1,通知彈出=2,通知點選=3,常駐通知展示(不重複上 報,一天之內只報一次) private String type;//通知 id:預警通知=1,天氣預報(早=2,晚=3),常駐=4 private String ap_time;//客戶端彈出時間 private String content;//備用欄位 } ``` ##### 2.4.3.9 使用者後臺活躍類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 使用者後臺活躍類 */ @Data public class AppActive { private String active_source;//1=upgrade,2=download(下載),3=plugin_upgrade } ``` ##### 2.4.3.10 使用者評論類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 使用者評論類 */ @Data public class AppComment { private int comment_id;//評論表 private int userid;//使用者 id private int p_comment_id;//父級評論 id(為 0 則是一級評論,不為 0 則是回覆) private String content;//評論內容 private String addtime;//建立時間 private int other_id;//評論的相關 id private int praise_count;//點贊數量 private int reply_count;//回覆數量 } ``` ##### 2.4.3.11 使用者收藏類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 使用者收藏類 */ @Data public class AppFavorites { private int id;//主鍵 private int course_id;//商品 id private int userid;//使用者 ID private String add_time;//建立時間 } ``` ##### 2.4.3.12 使用者點贊類 ```ruby import lombok.Data; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 使用者點贊類 */ @Data public class AppPraise { private int id; //主鍵 id private int userid;//使用者 id private int target_id;//點讚的物件 id private int type;//點贊型別 1 問答點贊 2 問答評論點贊 3 文章點贊數 4 評論點贊 private String add_time;//新增時間 } ``` #### 2.4.4 啟動類 ```ruby import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.util.Random; /** * @author Heaton * @email [email protected] * @date 2020/4/25 14:54 * @describe 啟動類 */ public class App { private final static Logger logger = LoggerFactory.getLogger(App.class); private static Random rand = new Random(); // 裝置id private static int s_mid = 0; // 使用者id private static int s_uid = 0; // 商品id private static int s_goodsid = 0; public static void main(String[] args) { // 引數一:控制傳送每條的延時時間,預設是0 Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L; // 引數二:迴圈遍歷次數 int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000; // 生成資料 generateLog(delay, loop_len); } private static void generateLog(Long delay, int loop_len) { for (int i = 0; i < loop_len; i++) { int flag = rand.nextInt(2); switch (flag) { case (0): //應用啟動 AppStart appStart = generateStart(); String jsonString = JSON.toJSONString(appStart); //控制檯列印 logger.info(jsonString); break; case (1): JSONObject json = new JSONObject(); json.put("ap", "app"); json.put("cm", generateComFields()); JSONArray eventsArray = new JSONArray(); // 事件日誌 // 商品點選,展示 if (rand.nextBoolean()) { eventsArray.add(generateDisplay()); json.put("et", eventsArray); } // 商品詳情頁 if (rand.nextBoolean()) { eventsArray.add(generateNewsDetail()); json.put("et", eventsArray); } // 商品列表頁 if (rand.nextBoolean()) { eventsArray.add(generateNewList()); json.put("et", eventsArray); } // 廣告 if (rand.nextBoolean()) { eventsArray.add(generateAd()); json.put("et", eventsArray); } // 訊息通知 if (rand.nextBoolean()) { eventsArray.add(generateNotification()); json.put("et", eventsArray); } // 使用者後臺活躍 if (rand.nextBoolean()) { eventsArray.add(generateBackground()); json.put("et", eventsArray); } //故障日誌 if (rand.nextBoolean()) { eventsArray.add(generateError()); json.put("et", eventsArray); } // 使用者評論 if (rand.nextBoolean()) { eventsArray.add(generateComment()); json.put("et", eventsArray); } // 使用者收藏 if (rand.nextBoolean()) { eventsArray.add(generateFavorites()); json.put("et", eventsArray); } // 使用者點贊 if (rand.nextBoolean()) { eventsArray.add(generatePraise()); json.put("et", eventsArray); } //時間 long millis = System.currentTimeMillis(); //控制檯列印 logger.info(millis + "|" + json.toJSONString()); break; } // 延遲 try { Thread.sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 公共欄位設定 */ private static JSONObject generateComFields() { AppBase appBase = new AppBase(); //裝置id appBase.setMid(s_mid + ""); s_mid++; // 使用者id appBase.setUid(s_uid + ""); s_uid++; // 程式版本號 5,6等 appBase.setVc("" + rand.nextInt(20)); //程式版本名 v1.1.1 appBase.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10)); // 安卓系統版本 appBase.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10)); // 語言 es,en,pt int flag = rand.nextInt(3); switch (flag) { case (0): appBase.setL("es"); break; case (1): appBase.setL("en"); break; case (2): appBase.setL("pt"); break; } // 渠道號 從哪個渠道來的 appBase.setSr(getRandomChar(1)); // 區域 flag = rand.nextInt(2); switch (flag) { case 0: appBase.setAr("BR"); case 1: appBase.setAr("MX"); } // 手機品牌 ba ,手機型號 md,就取2位數字了 flag = rand.nextInt(3); switch (flag) { case 0: appBase.setBa("Sumsung"); appBase.setMd("sumsung-" + rand.nextInt(20)); break; case 1: appBase.setBa("Huawei"); appBase.setMd("Huawei-" + rand.nextInt(20)); break; case 2: appBase.setBa("HTC"); appBase.setMd("HTC-" + rand.nextInt(20)); break; } // 嵌入sdk的版本 appBase.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10)); // gmail appBase.setG(getRandomCharAndNumr(8) + "@gmail.com"); // 螢幕寬高 hw flag = rand.nextInt(4); switch (flag) { case 0: appBase.setHw("640*960"); break; case 1: appBase.setHw("640*1136"); break; case 2: appBase.setHw("750*1134"); break; case 3: appBase.setHw("1080*1920"); break; } // 客戶端產生日誌時間 long millis = System.currentTimeMillis(); appBase.setT("" + (millis - rand.nextInt(99999999))); // 手機網路模式 3G,4G,WIFI flag = rand.nextInt(3); switch (flag) { case 0: appBase.setNw("3G"); break; case 1: appBase.setNw("4G"); break; case 2: appBase.setNw("WIFI"); break; } // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′ // 經度 appBase.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + ""); // 緯度 appBase.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + ""); return (JSONObject) JSON.toJSON(appBase); } /** * 商品展示事件 */ private static JSONObject generateDisplay() { AppDisplay appDisplay = new AppDisplay(); boolean boolFlag = rand.nextInt(10) < 7; // 動作:曝光商品=1,點選商品=2, if (boolFlag) { appDisplay.setAction("1"); } else { appDisplay.setAction("2"); } // 商品id String goodsId = s_goodsid + ""; s_goodsid++; appDisplay.setGoodsid(goodsId); // 順序 設定成6條吧 int flag = rand.nextInt(6); appDisplay.setPlace("" + flag); // 曝光型別 flag = 1 + rand.nextInt(2); appDisplay.setExtend1("" + flag); // 分類 flag = 1 + rand.nextInt(100); appDisplay.setCategory("" + flag); JSONObject jsonObject = (JSONObject) JSON.toJSON(appDisplay); return packEventJson("display", jsonObject); } /** * 商品詳情頁 */ private static JSONObject generateNewsDetail() { AppNewsDetail appNewsDetail = new AppNewsDetail(); // 頁面入口來源 int flag = 1 + rand.nextInt(3); appNewsDetail.setEntry(flag + ""); // 動作 appNewsDetail.setAction("" + (rand.nextInt(4) + 1)); // 商品id appNewsDetail.setGoodsid(s_goodsid + ""); // 商品來源型別 flag = 1 + rand.nextInt(3); appNewsDetail.setShowtype(flag + ""); // 商品樣式 flag = rand.nextInt(6); appNewsDetail.setShowtype("" + flag); // 頁面停留時長 flag = rand.nextInt(10) * rand.nextInt(7); appNewsDetail.setNews_staytime(flag + ""); // 載入時長 flag = rand.nextInt(10) * rand.nextInt(7); appNewsDetail.setLoading_time(flag + ""); // 載入失敗碼 flag = rand.nextInt(10); switch (flag) { case 1: appNewsDetail.setType1("102"); break; case 2: appNewsDetail.setType1("201"); break; case 3: appNewsDetail.setType1("325"); break; case 4: appNewsDetail.setType1("433"); break; case 5: appNewsDetail.setType1("542"); break; default: appNewsDetail.setType1(""); break; } // 分類 flag = 1 + rand.nextInt(100); appNewsDetail.setCategory("" + flag); JSONObject eventJson = (JSONObject) JSON.toJSON(appNewsDetail); return packEventJson("newsdetail", eventJson); } /** * 商品列表 */ private static JSONObject generateNewList() { AppLoading appLoading = new AppLoading(); // 動作 int flag = rand.nextInt(3) + 1; appLoading.setAction(flag + ""); // 載入時長 flag = rand.nextInt(10) * rand.nextInt(7); appLoading.setLoading_time(flag + ""); // 失敗碼 flag = rand.nextInt(10); switch (flag) { case 1: appLoading.setType1("102"); break; case 2: appLoading.setType1("201"); break; case 3: appLoading.setType1("325"); break; case 4: appLoading.setType1("433"); break; case 5: appLoading.setType1("542"); break; default: appLoading.setType1(""); break; } // 頁面 載入型別 flag = 1 + rand.nextInt(2); appLoading.setLoading_way("" + flag); // 擴充套件欄位1 appLoading.setExtend1(""); // 擴充套件欄位2 appLoading.setExtend2(""); // 使用者載入型別 flag = 1 + rand.nextInt(3); appLoading.setType("" + flag); JSONObject jsonObject = (JSONObject) JSON.toJSON(appLoading); return packEventJson("loading", jsonObject); } /** * 廣告相關欄位 */ private static JSONObject generateAd() { AppAd appAd = new AppAd(); // 入口 int flag = rand.nextInt(3) + 1; appAd.setEntry(flag + ""); // 動作 flag = rand.nextInt(5) + 1; appAd.setAction(flag + ""); // 內容型別型別 flag = rand.nextInt(6) + 1; appAd.setContentType(flag + ""); // 展示樣式 flag = rand.nextInt(120000) + 1000; appAd.setDisplayMills(flag + ""); flag = rand.nextInt(1); if (flag == 1) { appAd.setContentType(flag + ""); flag = rand.nextInt(6); appAd.setItemId(flag + ""); } else { appAd.setContentType(flag + ""); flag = rand.nextInt(1) + 1; appAd.setActivityId(flag + ""); } JSONObject jsonObject = (JSONObject) JSON.toJSON(appAd); return packEventJson("ad", jsonObject); } /** * 啟動日誌 */ private static AppStart generateStart() { AppStart appStart = new AppStart(); //裝置id appStart.setMid(s_mid + ""); s_mid++; // 使用者id appStart.setUid(s_uid + ""); s_uid++; // 程式版本號 5,6等 appStart.setVc("" + rand.nextInt(20)); //程式版本名 v1.1.1 appStart.setVn("1." + rand.nextInt(4) + "." + rand.nextInt(10)); // 安卓系統版本 appStart.setOs("8." + rand.nextInt(3) + "." + rand.nextInt(10)); //設定日誌型別 appStart.setEn("start"); // 語言 es,en,pt int flag = rand.nextInt(3); switch (flag) { case (0): appStart.setL("es"); break; case (1): appStart.setL("en"); break; case (2): appStart.setL("pt"); break; } // 渠道號 從哪個渠道來的 appStart.setSr(getRandomChar(1)); // 區域 flag = rand.nextInt(2); switch (flag) { case 0: appStart.setAr("BR"); case 1: appStart.setAr("MX"); } // 手機品牌 ba ,手機型號 md,就取2位數字了 flag = rand.nextInt(3); switch (flag) { case 0: appStart.setBa("Sumsung"); appStart.setMd("sumsung-" + rand.nextInt(20)); break; case 1: appStart.setBa("Huawei"); appStart.setMd("Huawei-" + rand.nextInt(20)); break; case 2: appStart.setBa("HTC"); appStart.setMd("HTC-" + rand.nextInt(20)); break; } // 嵌入sdk的版本 appStart.setSv("V2." + rand.nextInt(10) + "." + rand.nextInt(10)); // gmail appStart.setG(getRandomCharAndNumr(8) + "@gmail.com"); // 螢幕寬高 hw flag = rand.nextInt(4); switch (flag) { case 0: appStart.setHw("640*960"); break; case 1: appStart.setHw("640*1136"); break; case 2: appStart.setHw("750*1134"); break; case 3: appStart.setHw("1080*1920"); break; } // 客戶端產生日誌時間 long millis = System.currentTimeMillis(); appStart.setT("" + (millis - rand.nextInt(99999999))); // 手機網路模式 3G,4G,WIFI flag = rand.nextInt(3); switch (flag) { case 0: appStart.setNw("3G"); break; case 1: appStart.setNw("4G"); break; case 2: appStart.setNw("WIFI"); break; } // 拉丁美洲 西經34°46′至西經117°09;北緯32°42′至南緯53°54′ // 經度 appStart.setLn((-34 - rand.nextInt(83) - rand.nextInt(60) / 10.0) + ""); // 緯度 appStart.setLa((32 - rand.nextInt(85) - rand.nextInt(60) / 10.0) + ""); // 入口 flag = rand.nextInt(5) + 1; appStart.setEntry(flag + ""); // 開屏廣告型別 flag = rand.nextInt(2) + 1; appStart.setOpen_ad_type(flag + ""); // 狀態 flag = rand.nextInt(10) > 8 ? 2 : 1; appStart.setAction(flag + ""); // 載入時長 appStart.setLoading_time(rand.nextInt(20) + ""); // 失敗碼 flag = rand.nextInt(10); switch (flag) { case 1: appStart.setDetail("102"); break; case 2: appStart.setDetail("201"); break; case 3: appStart.setDetail("325"); break; case 4: appStart.setDetail("433"); break; case 5: appStart.setDetail("542"); break; default: appStart.setDetail(""); break; } // 擴充套件欄位 appStart.setExtend1(""); return appStart; } /** * 訊息通知 */ private static JSONObject generateNotification() { AppNotification appNotification = new AppNotification(); int flag = rand.nextInt(4) + 1; // 動作 appNotification.setAction(flag + ""); // 通知id flag = rand.nextInt(4) + 1; appNotification.setType(flag + ""); // 客戶端彈時間 appNotification.setAp_time((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); // 備用欄位 appNotification.setContent(""); JSONObject jsonObject = (JSONObject) JSON.toJSON(appNotification); return packEventJson("notification", jsonObject); } /** * 後臺活躍 */ private static JSONObject generateBackground() { AppActive appActive_background = new AppActive(); // 啟動源 int flag = rand.nextInt(3) + 1; appActive_background.setActive_source(flag + ""); JSONObject jsonObject = (JSONObject) JSON.toJSON(appActive_background); return packEventJson("active_background", jsonObject); } /** * 錯誤日誌資料 */ private static JSONObject generateError() { AppErrorLog appErrorLog = new AppErrorLog(); String[] errorBriefs = {"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)", "at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}; //錯誤摘要 String[] errorDetails = {"java.lang.NullPointerException\\n " + "at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n " + "at cn.lift.dfdf.web.AbstractBaseController.validInbound", "at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n " + "at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n" + " at java.lang.reflect.Method.invoke(Method.java:606)\\n"}; //錯誤詳情 //錯誤摘要 appErrorLog.setErrorBrief(errorBriefs[rand.nextInt(errorBriefs.length)]); //錯誤詳情 appErrorLog.setErrorDetail(errorDetails[rand.nextInt(errorDetails.length)]); JSONObject jsonObject = (JSONObject) JSON.toJSON(appErrorLog); return packEventJson("error", jsonObject); } /** * 為各個事件型別的公共欄位(時間、事件型別、Json資料)拼接 */ private static JSONObject packEventJson(String eventName, JSONObject jsonObject) { JSONObject eventJson = new JSONObject(); eventJson.put("ett", (System.currentTimeMillis() - rand.nextInt(99999999)) + ""); eventJson.put("en", eventName); eventJson.put("kv", jsonObject); return eventJson; } /** * 獲取隨機字母組合 * * @param length 字串長度 */ private static String getRandomChar(Integer length) { StringBuilder str = new StringBuilder(); Random random = new Random(); for (int i = 0; i < length; i++) { // 字串 str.append((char) (65 + random.nextInt(26)));// 取得大寫字母 } return str.toString(); } /** * 獲取隨機字母數字組合 * * @param length 字串長度 */ private static String getRandomCharAndNumr(Integer length) { StringBuilder str = new StringBuilder(); Random random = new Random(); for (int i = 0; i < length; i++) { boolean b = random.nextBoolean(); if (b) { // 字串 // int choice = random.nextBoolean() ? 65 : 97; 取得65大寫字母還是97小寫字母 str.append((char) (65 + random.nextInt(26)));// 取得大寫字母 } else { // 數字 str.append(String.valueOf(random.nextInt(10))); } } return str.toString(); } /** * 收藏 */ private static JSONObject generateFavorites() { AppFavorites favorites = new AppFavorites(); favorites.setCourse_id(rand.nextInt(10)); favorites.setUserid(rand.nextInt(10)); favorites.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); JSONObject jsonObject = (JSONObject) JSON.toJSON(favorites); return packEventJson("favorites", jsonObject); } /** * 點贊 */ private static JSONObject generatePraise() { AppPraise praise = new AppPraise(); praise.setId(rand.nextInt(10)); praise.setUserid(rand.nextInt(10)); praise.setTarget_id(rand.nextInt(10)); praise.setType(rand.nextInt(4) + 1); praise.setAdd_time((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); JSONObject jsonObject = (JSONObject) JSON.toJSON(praise); return packEventJson("praise", jsonObject); } /** * 評論 */ private static JSONObject generateComment() { AppComment comment = new AppComment(); comment.setComment_id(rand.nextInt(10)); comment.setUserid(rand.nextInt(10)); comment.setP_comment_id(rand.nextInt(5)); comment.setContent(getCONTENT()); comment.setAddtime((System.currentTimeMillis() - rand.nextInt(99999999)) + ""); comment.setOther_id(rand.nextInt(10)); comment.setPraise_count(rand.nextInt(1000)); comment.setReply_count(rand.nextInt(200)); JSONObject jsonObject = (JSONObject) JSON.toJSON(comment); return packEventJson("comment", jsonObject); } /** * 生成單個漢字 */ private static char getRandomChar() { String str = ""; int hightPos; // int lowPos; Random random = new Random(); //隨機生成漢子的兩個位元組 hightPos = (176 + Math.abs(random.nextInt(39))); lowPos = (161 + Math.abs(random.nextInt(93))); byte[] b = new byte[2]; b[0] = (Integer.valueOf(hightPos)).byteValue(); b[1] = (Integer.valueOf(lowPos)).byteValue(); try { str = new String(b, "GBK"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); System.out.println("錯誤"); } return str.charAt(0); } /** * 拼接成多個漢字 */ private static String getCONTENT() { StringBuilder str = new StringBuilder(); for (int i = 0; i < rand.nextInt(100); i++) { str.append(getRandomChar()); } return str.toString(); } } ``` #### 2.4.5 啟動測試 > 注意,需要將日誌模擬放到2臺伺服器上,模擬日誌每一條中即包括公共日誌,又包含事件日誌,需要flume攔截器進行日誌分發,當然也需要兩個flume-ng來做這個事情 > > 打包上傳2臺伺服器節點,生產資料為後面的測試做準備,這裡為使用者目錄test資料夾下 ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517134403101-416854732.png) ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517134439532-202254655.png) > 通過引數控制生成訊息速度及產量(如下 2秒一條,列印1000條) ```scheme #控制時間及條數 nohup java -jar data-producer-1.0-SNAPSHOT-jar-with-dependencies.jar 2000 1000 & #監控日誌 tail -F /root/logs/*.log ``` ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200501155109242-835705829.png) > 通過[www.json.cn](https://www.json.cn/)檢視資料格式 ## 3 建立KafKa-Topic - 建立啟動日誌主題:topic_start - 建立事件日誌主題:topic_event ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200501160839749-548792370.png) ## 4 Flume準備 > 共分為2組flume > > 第一組:將伺服器日誌收集,並使用Kafka-Channels將資料發往Kafka不同的Topic,其中使用攔截器進行公共日誌和事件日誌的分發, > > 第二組:收集Kafka資料,使用Flie-Channels快取資料,最終發往Hdfs儲存 ![](https://img2020.cnblogs.com/blog/1235870/202005/1235870-20200517141914275-28869599.png) ### 4.1 Flume:File->Kafka配置編寫 - vim /root/test/file-flume-kafka.conf ```sql #1 定義元件 a1.sources=r1 a1.channels=c1 c2 # 2 source配置 type型別 positionFile記錄日誌讀取位置 filegroups讀取哪些目錄 app.+為讀取什麼開頭 channels發往哪裡 a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/test/flume/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #3 攔截器 這裡2個為自定義的攔截器 multiplexing為型別區分選擇器 header頭用於區分型別 mapping匹配頭 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.heaton.bigdata.flume.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.heaton.bigdata.flume.LogTypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 #4 channel配置 kafkaChannel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type =org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer ``` > 在生產日誌的2臺伺服器節點上建立flume配置檔案。 > > LogETLInterceptor,LogTypeInterceptor為自定義攔截 ### 4.2 自定義攔截器 > data-flume工程 - LogUtils ```ruby import org.apache.commons.lang.math.NumberUtils; public class LogUtils { public static boolean validateEvent(String log) { /** 伺服器時間 | json 1588319303710|{ "cm":{ "ln":"-51.5","sv":"V2.0.7","os":"8.0.8","g":"[email protected]","mid":"13", "nw":"4G","l":"en","vc":"7","hw":"640*960","ar":"MX","uid":"13","t":"1588291826938", "la":"-38.2","md":"Huawei-14","vn":"1.3.6","ba":"Huawei","sr":"Y" }, "ap":"app", "et":[{ "ett":"1588228193191","en":"ad","kv":{"activityId":"1","displayMills":"113201","entry":"3","action":"5","contentType":"0"} },{ "ett":"1588300304713","en":"notification","kv":{"ap_time":"1588277440794","action":"2","type":"3","content":""} },{ "ett":"1588249203743","en":"active_background","kv":{"active_source":"3"} },{ "ett":"1588254200122","en":"favorites","kv":{"course_id":5,"id":0,"add_time":"1588264138625","userid":0} },{ "ett":"1588281152824","en":"praise","kv":{"target_id":4,"id":3,"type":3,"add_time":"1588307696417","userid":8} }] } */ // 1 切割 String[] logContents = log.split("\\|"); // 2 校驗 if (logContents.length != 2) { return false; } //3 校驗伺服器時間 if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) { return false; } // 4 校驗 json if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) { return false; } return true; } public static boolean validateStart(String log) { /** { "action":"1","ar":"MX","ba":"HTC","detail":"201","en":"start","entry":"4","extend1":"", "g":"[email protected]","hw":"750*1134","l":"pt","la":"-29.7","ln":"-48.1","loading_time":"0", "md":"HTC-18","mid":"14","nw":"3G","open_ad_type":"2","os":"8.0.8","sr":"D","sv":"V2.8.2", "t":"1588251833523","uid":"14","vc":"15","vn":"1.2.9" } */ if (log == null) { return false; } // 校驗 json if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) { return