1. 程式人生 > >spark 處理網路日誌 查詢pv uv例項

spark 處理網路日誌 查詢pv uv例項

這裡我們先理解一下spark處理資料的流程,由於spark 有standalone,local,yarn等多種模式,每種模式都有不同之處,但是總體流程都是一樣的,大致就是客戶端向叢集管理者提交作業,生成有向無環圖,圖中的內容包括分成幾個stage,每個stage有幾個task,每個task分別由哪個executor來執行,接下來的工作就是整個spark叢集按照有向無環圖的佈置來進行,並得出結果。

下面我們舉一個網路日誌計算pv uv的例項,通過程式碼打成jar包的方式在 spark-submit執行,程式碼 具體實現以下功能:

1. 資料清洗,只保留date url 和guid

2.建立spark schema ,將rdd轉換成dataframe,並建立臨時表

3.使用sql 語句查詢 uv pv

4.將結果儲存到資料庫中

package com.stanley.scala.objects

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row

object WebLog {
  def main(args: Array[String]): Unit = {
    //建立配置檔案,選擇yarn-clent模式
     val conf=new SparkConf().setAppName("SparkTest").setMaster("yarn-client")
     val sc =new SparkContext(conf)
     //讀取資料
     val fileRdd=sc.textFile(args(0))
     //ETL清洗資料
     val weblogRdd=fileRdd.filter(_.length>0).map(line=>{
       val arr=line.split("\t")       
       val url=arr(1)       
       val guid =arr(5)
       val date=arr(17).substring(0,10)
       (date,guid,url)       
     }).filter(_._3.length>0)
     //建立sparksql
     val sqlContext=new SQLContext(sc)
     //建立schema
     val schema=StructType(
         List(
             StructField("date",StringType,true),
             StructField("guid",StringType,true),
             StructField("url",StringType,true)
            )
      )
     val rowRdd=weblogRdd.map(tuple=>Row(tuple._1,tuple._2,tuple._3))
     val weblogDf=sqlContext.createDataFrame(rowRdd, schema)
     //註冊臨時表
     weblogDf.registerTempTable("webLog")
     //建立sql 語句查詢uv,pv
     val uvSql="select count(*) pv,count(distinct(guid)) uv from webLog"
     val uvpvDf=sqlContext.sql(uvSql)
     uvpvDf.show()
     //結果傳入mysql
     val url="jdbc:mysql://master:3306/test?user=root&password=123456"
      import java.util.Properties
      val properties=new Properties
      uvpvDf.write.jdbc(url,"uvpv",properties)
     //關閉資源
     sc.stop()
  }
}

值得注意的是 由於資料量並不是很大,我們可以在spark-defaults.conf中設定分割槽數,來加快執行速度,如果不設定這個引數分割槽數可能會有200個會產生200個task 

spark.sql.shuffle.partitions         10

接下來我們執行程式,先啟動叢集,並開啟historyserver, 然後進入spark目錄,屬於spark-sumbit指令

./bin/spark-submit \
--class com.stanley.scala.objects.WebLog \
/opt/testfile/sparkTest.jar \
/input/2015082818

這是可以通過 webUI看到有向無環圖,一共分為三個階段


再檢視以下日誌,分割槽數也是我們設定的 10個


進入mysql 檢視uvpv表已經存在