spark 處理網路日誌 查詢pv uv例項
阿新 • • 發佈:2019-01-03
這裡我們先理解一下spark處理資料的流程,由於spark 有standalone,local,yarn等多種模式,每種模式都有不同之處,但是總體流程都是一樣的,大致就是客戶端向叢集管理者提交作業,生成有向無環圖,圖中的內容包括分成幾個stage,每個stage有幾個task,每個task分別由哪個executor來執行,接下來的工作就是整個spark叢集按照有向無環圖的佈置來進行,並得出結果。
下面我們舉一個網路日誌計算pv uv的例項,通過程式碼打成jar包的方式在 spark-submit執行,程式碼 具體實現以下功能:
1. 資料清洗,只保留date url 和guid
2.建立spark schema ,將rdd轉換成dataframe,並建立臨時表
3.使用sql 語句查詢 uv pv
4.將結果儲存到資料庫中
package com.stanley.scala.objects import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row object WebLog { def main(args: Array[String]): Unit = { //建立配置檔案,選擇yarn-clent模式 val conf=new SparkConf().setAppName("SparkTest").setMaster("yarn-client") val sc =new SparkContext(conf) //讀取資料 val fileRdd=sc.textFile(args(0)) //ETL清洗資料 val weblogRdd=fileRdd.filter(_.length>0).map(line=>{ val arr=line.split("\t") val url=arr(1) val guid =arr(5) val date=arr(17).substring(0,10) (date,guid,url) }).filter(_._3.length>0) //建立sparksql val sqlContext=new SQLContext(sc) //建立schema val schema=StructType( List( StructField("date",StringType,true), StructField("guid",StringType,true), StructField("url",StringType,true) ) ) val rowRdd=weblogRdd.map(tuple=>Row(tuple._1,tuple._2,tuple._3)) val weblogDf=sqlContext.createDataFrame(rowRdd, schema) //註冊臨時表 weblogDf.registerTempTable("webLog") //建立sql 語句查詢uv,pv val uvSql="select count(*) pv,count(distinct(guid)) uv from webLog" val uvpvDf=sqlContext.sql(uvSql) uvpvDf.show() //結果傳入mysql val url="jdbc:mysql://master:3306/test?user=root&password=123456" import java.util.Properties val properties=new Properties uvpvDf.write.jdbc(url,"uvpv",properties) //關閉資源 sc.stop() } }
值得注意的是 由於資料量並不是很大,我們可以在spark-defaults.conf中設定分割槽數,來加快執行速度,如果不設定這個引數分割槽數可能會有200個會產生200個task
spark.sql.shuffle.partitions 10
接下來我們執行程式,先啟動叢集,並開啟historyserver, 然後進入spark目錄,屬於spark-sumbit指令
./bin/spark-submit \
--class com.stanley.scala.objects.WebLog \
/opt/testfile/sparkTest.jar \
/input/2015082818
這是可以通過 webUI看到有向無環圖,一共分為三個階段
再檢視以下日誌,分割槽數也是我們設定的 10個
進入mysql 檢視uvpv表已經存在