Spark的一個小Demo
阿新 • • 發佈:2019-02-21
test else reat var session main start org local
一個簡單的小demo
package com.liuyang.test.scala import org.apache.spark.sql.SparkSession /** * @author liuyang * @date 2019-02-20 17:40 * @description 實現數據的拆分 */ object Test { //創建樣例類 case class Accessing(client_type: String, mac_line: String, package_name: String , start_time: String, end_time: String, program_name: String , duration: String, origin: String, version: String, total_duration: String, dt: String) //創建一個數組記錄client_type、mac_line、version和dt val record: Array[String] = new Array[String](4) def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local[4]").appName("Test").getOrCreate() //讀取日誌 val data = session.sparkContext.textFile("E:\\bigdataLog\\rawData\\raw_data.txt") import session.implicits._ val all = data.map(line => { //判斷該行是否是行為記錄起始位置,因為行為記錄開頭是 2019-XX-XX形式,所以直接判定line是否以2開頭即可 if (line.startsWith("2")) { //獲取client_type、mac_line、version和dt getArray(line) //通過觀察,行為記錄信息若&data=後面的值為空,則不記錄其信息。添加判斷是否含有非空data if (line.contains("&data=com")) { //若含有data記錄,順便提取出與公共數據共處一行的訪問包數據 resolve(line, record) } } else { //此處代表該行為訪問包數據,直接提取 resolve(line, record) } }).filter(r => { r != null && r != () }).coalesce(1) all.map(once => { once.asInstanceOf[Accessing] }).toDF().show(10000) } def getArray(line: String): Array[String] = { //client_type //判斷該行是否包含“&clienttype”字段,包含的話提取出值,不包含則置空值 if (line.contains("&clienttype")) { record(0) = line.substring(line.indexOf("&client")).split("&")(1).split("=")(1) } else { record(0) = "" } //mac_line //同上 if (line.contains("&macline")) { record(1) = line.substring(line.indexOf("&macline")).split("&")(1).split("=")(1) } else { record(1) = "" } //version //同上 if (line.contains("&version")) { record(2) = line.substring(line.indexOf("&version")).split("&")(1).split("=")(1) } else { record(2) = "" } //dt(自己通過觀察判斷dt應該是行為記錄初始信息的時間,提取到日) record(3) = line.split(" ")(0) record } def resolve(line: String, record: Array[String]): Accessing = { var package_name = "" //提取package_name if (line.contains("&data")) { package_name = line.substring(line.indexOf("&data"), line.indexOf(",{1=")).split("=")(1) } else { package_name = line.split(",")(0) } //經過觀察,以下字段不符合試題中的要求,暫時排除 if (package_name.contains("InputSourceBehavior") || package_name.contains("systemRamCpuBehavior") || package_name.contains("keyDownBehavior") || package_name.contains("appRamCpuBehavior") || package_name.contains("sdcardUsbBehavior") || package_name.contains("multiscreeninteractiontvbehavior") || line.equals("") || line.contains("deviceProp") || line.contains("tvratings") || line.contains("keydownBehavior") || line.contains("mediaPlayBehavior")) { return null } else { //如果不包含上述字符串的話,正常處理 if (line.contains("5=")) { //數據示例:com.tcl.thirdAppPlayBehavior,{1=qiyi,2=214260401,3=天乩之白蛇傳說,4=2703,5=-1000} Accessing(record(0), record(1), package_name, "", "", line.substring(line.indexOf("3="), line.indexOf(",4=")).split("=")(1), line.substring(line.indexOf("5="), line.indexOf("}")).split("=")(1), line.substring(line.indexOf("1="), line.indexOf(",2=")).split("=")(1), record(2), line.substring(line.indexOf("4="), line.indexOf(",5=")).split("=")(1), record(3)) } else if (line.contains("3=")) { //數據示例:com.qiyi.video,{1=2019-01-31 05:45:45,2=2019-01-31 07:23:34,3=2} Accessing(record(0), record(1), package_name, line.substring(line.indexOf("1="), line.indexOf(",2=")).split("=")(1), line.substring(line.indexOf("2="), line.indexOf(",3=")).split("=")(1), "", "", "", "", "", "") } else if (line.contains("2=")) { //數據示例:com.tcl.TVBasicBehavior,{1=2019-01-31 09:17:19,2=2019-01-31 09:59:20} Accessing(record(0), record(1), package_name, line.substring(line.indexOf("1="), line.indexOf(",2=")).split("=")(1), line.substring(line.indexOf("2="), line.indexOf("}")).split("=")(1), "", "", "", "", "", "") } else { return null } } } }
Spark的一個小Demo