大資料學習筆記(spark日誌分析案例)
阿新 • • 發佈:2018-12-21
前提:500w條記錄環境下(可以更多,視計算機效能而定),統計每天最熱門的top3板塊。
1、PV和UV
我們要統計的是最熱門的top3板塊,而熱門如果只是簡單地通過頁面瀏覽量(PV)或者使用者瀏覽量(UV)來決定都顯得比較片面,這裡我們綜合這兩者(0.3PV+0.7UV)來獲取我們的需求。
1.1、PV
PageView:瀏覽量。(有幾次瀏覽就算幾次)
1.2、UV
UserView:使用者量。(同一個使用者同一天瀏覽一個模組多次,只能算一次) 1.3、PV+UV
通過上面的分析已經解釋了PV和UV的含義,以及獲取這兩個值的具體操作思路。下面探討一下,如何在這兩個值的基礎上,求出每天最熱門的top3板塊。
按照前面的操作已經獲得了兩個RDD,PVRDD、UVRDD。在這兩個RDD上使用join連線,在join運算元裡面通過(0.3PV+0.7UV)可以獲得每天的各個模組的一個熱度值。將這個值排序。取前三名,就是我們要求的每天最熱top3板塊了。
2、生成資料
由於沒有獲取大量資料的條件,這裡我們通過程式碼自己製造一部分資料來進行相關操作。我模仿的資料結構是:UUID 使用者id 時間戳 頁面id 模組名(中間用\t製表符分隔)
package com.hpe.data; import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Random; import java.util.UUID; public class MakeLogData { public static void main(String[] args) throws Exception { BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("d:/logdata"))); Random random = new Random(); int logNUm = 5000000; StringBuilder stringBuilder = new StringBuilder(); List<String> channelList = Arrays.asList("spark","hdfs","mr","yarn","hive","scala","python"); for (int i = 0; i < logNUm; i++) { String sessionId = UUID.randomUUID().toString(); int userId = random.nextInt(10000); int year = 2018; int month = random.nextInt(12) + 1; int day = random.nextInt(30) + 1; int hour = random.nextInt(24); int minute = random.nextInt(60); int second = random.nextInt(60); String dateTime = year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + second; SimpleDateFormat form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long time = form.parse(dateTime).getTime(); int pageId = random.nextInt(100); String channel = channelList.get(i % channelList.size() ); stringBuilder.append(sessionId + "\t" + userId + "\t" + time + "\t" + pageId + "\t" + channel + "\n"); bw.write(stringBuilder.toString()); stringBuilder.delete(0, stringBuilder.length()); } bw.flush(); bw.close(); } }
3、程式碼
package com.hpe.spark.loganalyse import org.apache.spark.SparkConf import org.apache.spark.SparkContext import java.util.Date import java.text.SimpleDateFormat import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.rdd.RDD object PVAndUV { def main(args: Array[String]): Unit = { //配置資訊 val conf = new SparkConf(); conf.setAppName("UV + PV") conf.setMaster("local") val sc = new SparkContext(conf) //載入資料 val rdd = sc.textFile("d:/data/logdata") //呼叫方法 val rdd2 = first(rdd) //rdd2.saveAsTextFile("d:/data/Log2") sc.stop() } //封裝方法 def first(rdd:RDD[String]) = { //切割字串 val splitRDD = rdd.map { _.split("\t") } //過濾,去除髒資料 val filterRDD = splitRDD.filter { _.length == 5 } //PV model val reduceRDD = pv(filterRDD) //UV model val reduceRDD2 = uv(filterRDD) //jion 合併兩個RDD val unionRDD = reduceRDD.join(reduceRDD2) //返回 時間_模組 //(時間_模組,(a,b)) val endRDD = unionRDD .map(x =>{ val value = x._2._1 *0.3 + x._2._2 *0.7 (x._1,value) }) .sortBy(_._2,false) .map(x =>{ val day = x._1.split("_")(0) val model = x._1.split("_")(1) (day,model) }) .groupByKey() .map(x => { val list = x._2.take(3) (x._1,list) }).foreach { println } endRDD } //pv操作 def pv(filterRDD:RDD[Array[String]]) = { val mapRDD = filterRDD.map { x => { val time = x(2).toLong val date = new Date(time) val format = new SimpleDateFormat("yyyy-MM-dd") val dateStr = format.format(date) x(2) = dateStr //返回 時間_模組 (x(2) + "_" + x(4),1) } } val reduceRDD=mapRDD.reduceByKey(_+_) reduceRDD } //uv操作 def uv(filterRDD:RDD[Array[String]]) = { val mapRDD2 = filterRDD.map { x => { val time = x(2).toLong val date = new Date(time) val format = new SimpleDateFormat("yyyy-MM-dd") val dateStr = format.format(date) x(2) = dateStr //返回 使用者id_模組_時間 (x(1) + "_" + x(2) + "_" + x(4),null) } } //去重 val disRDD = mapRDD2.distinct() //只需要key,組裝二元組 val tupleRDD = disRDD.map(x =>{ val key = x._1 //key:會員id_時間_板塊id //把會員id切掉 val newKey = key.substring(key.indexOf("_")+1, key.length()) (newKey,1) }) //累加 val reduceRDD2=tupleRDD.reduceByKey(_+_) reduceRDD2 } }