1. 程式人生 > >Spark的一個小Demo

Spark的一個小Demo

test else reat var session main start org local

一個簡單的小demo

package com.liuyang.test.scala

import org.apache.spark.sql.SparkSession

/**
  * @author liuyang
  * @date 2019-02-20 17:40
  * @description 實現數據的拆分
  */
object Test {

  //創建樣例類
  case class Accessing(client_type: String, mac_line: String, package_name: String
                       , start_time: String, end_time: String, program_name: String
                       , duration: String, origin: String, version: String, total_duration: String, dt: String)

  //創建一個數組記錄client_type、mac_line、version和dt
  val record: Array[String] = new Array[String](4)

  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local[4]").appName("Test").getOrCreate()
    //讀取日誌
    val data = session.sparkContext.textFile("E:\\bigdataLog\\rawData\\raw_data.txt")
    import session.implicits._

    val all = data.map(line => {
      //判斷該行是否是行為記錄起始位置,因為行為記錄開頭是 2019-XX-XX形式,所以直接判定line是否以2開頭即可
      if (line.startsWith("2")) {
        //獲取client_type、mac_line、version和dt
        getArray(line)
        //通過觀察,行為記錄信息若&data=後面的值為空,則不記錄其信息。添加判斷是否含有非空data
        if (line.contains("&data=com")) {
          //若含有data記錄,順便提取出與公共數據共處一行的訪問包數據
          resolve(line, record)
        }
      } else {
        //此處代表該行為訪問包數據,直接提取
        resolve(line, record)
      }
    }).filter(r => {
      r != null && r != ()
    }).coalesce(1)

    all.map(once => {
      once.asInstanceOf[Accessing]
    }).toDF().show(10000)

  }


  def getArray(line: String): Array[String] = {
      //client_type
      //判斷該行是否包含“&clienttype”字段,包含的話提取出值,不包含則置空值
      if (line.contains("&clienttype")) {
        record(0) = line.substring(line.indexOf("&client")).split("&")(1).split("=")(1)
      } else {
        record(0) = ""
      }

      //mac_line
      //同上
      if (line.contains("&macline")) {
        record(1) = line.substring(line.indexOf("&macline")).split("&")(1).split("=")(1)
      } else {
        record(1) = ""
      }

      //version
      //同上
      if (line.contains("&version")) {
        record(2) = line.substring(line.indexOf("&version")).split("&")(1).split("=")(1)
      } else {
        record(2) = ""
      }

      //dt(自己通過觀察判斷dt應該是行為記錄初始信息的時間,提取到日)
      record(3) = line.split(" ")(0)

      record
  }

    def resolve(line: String, record: Array[String]): Accessing = {
      var package_name = ""
      //提取package_name
      if (line.contains("&data")) {
        package_name = line.substring(line.indexOf("&data"), line.indexOf(",{1=")).split("=")(1)
      } else {
        package_name = line.split(",")(0)
      }

      //經過觀察,以下字段不符合試題中的要求,暫時排除
      if (package_name.contains("InputSourceBehavior") || package_name.contains("systemRamCpuBehavior") || package_name.contains("keyDownBehavior") || package_name.contains("appRamCpuBehavior") || package_name.contains("sdcardUsbBehavior") || package_name.contains("multiscreeninteractiontvbehavior") || line.equals("") || line.contains("deviceProp") || line.contains("tvratings") || line.contains("keydownBehavior") || line.contains("mediaPlayBehavior")) {
        return null
      } else {
        //如果不包含上述字符串的話,正常處理
        if (line.contains("5=")) {
          //數據示例:com.tcl.thirdAppPlayBehavior,{1=qiyi,2=214260401,3=天乩之白蛇傳說,4=2703,5=-1000}
          Accessing(record(0), record(1), package_name, "", "", line.substring(line.indexOf("3="), line.indexOf(",4=")).split("=")(1), line.substring(line.indexOf("5="), line.indexOf("}")).split("=")(1), line.substring(line.indexOf("1="), line.indexOf(",2=")).split("=")(1), record(2), line.substring(line.indexOf("4="), line.indexOf(",5=")).split("=")(1), record(3))
        } else if (line.contains("3=")) {
          //數據示例:com.qiyi.video,{1=2019-01-31 05:45:45,2=2019-01-31 07:23:34,3=2}
          Accessing(record(0), record(1), package_name, line.substring(line.indexOf("1="), line.indexOf(",2=")).split("=")(1), line.substring(line.indexOf("2="), line.indexOf(",3=")).split("=")(1), "", "", "", "", "", "")
        } else if (line.contains("2=")) {
          //數據示例:com.tcl.TVBasicBehavior,{1=2019-01-31 09:17:19,2=2019-01-31 09:59:20}
          Accessing(record(0), record(1), package_name, line.substring(line.indexOf("1="), line.indexOf(",2=")).split("=")(1), line.substring(line.indexOf("2="), line.indexOf("}")).split("=")(1), "", "", "", "", "", "")
        } else {
          return null
        }
      }
    }
}

Spark的一個小Demo