Spark日誌分析案例
- SparkCore日誌分析主程式
package com.ibeifeng.bigdata.spark.app.core
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by XuanYu on 2016/7/11.
*/
object LogAnalyzer {
def main(args: Array[String]) {
// step 1: Create SparkConf Object
val conf = new SparkConf()
.setAppName("LogAnalyzer Application" )
.setMaster("local[2]")
// step 2: Create SparkContext Object
val sc = new SparkContext(conf)
/** ====================================================================== */
val logFile = "file:///D:/access_log"//"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/apache.access.log"
// 1: input
val accessLogs = sc.textFile(logFile)
/**
*filter log data
*/
.filter(ApacheAccessLog.isValidateLogLine)
/**
parse log
*/
.map(line => ApacheAccessLog.parseLogLine(line))
/**
*cache ,如果某個RDD反覆的被使用,可以考慮將其進行cache
*/
accessLogs.cache()
println("Access Logs Count : " + accessLogs.count())
// =====================================================================
/**
*The average, min, and max content size of responses returned from the server.
*/
val contentSiezes = accessLogs.map(log => log.contentSize) // RDD[String]
/**
*cache contentSizes
*/
contentSiezes.cache()
// compute
val avgContentSize = contentSiezes.reduce(_ + _) / contentSiezes.count()
val minContentSize = contentSiezes.min()
val maxContextSize = contentSiezes.max()
// unpersist
contentSiezes.unpersist()
println("Content Size Avg: %s, Min: %s , Max: %s".format(
avgContentSize, minContentSize, maxContextSize
))
// =====================================================================
/**
A count of response code's returned.
*/
val responseCodeToCount = accessLogs
//
.map(log => (log.responseCode, 1))
//
.reduceByKey(_ + _)
//
.take(5)
println(s"""Response Code Count: ${responseCodeToCount.mkString("[",",","]")}""")
// =====================================================================
/**
*All IPAddresses that have accessed this server more than N times.
*/
val ipAddresses = accessLogs
//
.map(log => (log.ipAddress, 1))
//
.reduceByKey(_ + _)
//
// .filter(tuple => (tuple._2 > 10 ))
//
.map(tuple => tuple._1)
//
.take(3)
println(s"""IPAddress : \${ipAddresses.mkString("[",",","]")}""")
// =====================================================================
/**
*The top endpoints requested by count.
*/
val topEndpoints = accessLogs
//
.map(log => (log.endPoint, 1))
//
.reduceByKey(_ + _)
// top
// def top(num : scala.Int)(implicit ord : scala.Ordering[T])
.top(3)(OrderingUtils.SecondValueOrdering)
/**
//
.map(tuple => (tuple._2, tuple._1))
.sortByKey(false)
.take(3)
.map(tuple => (tuple._2, tuple._1))
*/
println(s"""Top Endpoints : ${topEndpoints.mkString("[",",","]")}""")
// unpersist
accessLogs.unpersist()
/** ====================================================================== */
// step 3: Stop SparkContext
sc.stop()
}
}
2、日誌分析資料清洗類
package com.ibeifeng.bigdata.spark.app.core
/**
* Created by XuanYu on 2016/7/11.
*/
case class ApacheAccessLog(
ipAddress: String ,
clientIdentd: String ,
userId: String ,
dataTime: String ,
method: String ,
endPoint: String ,
protocol: String ,
responseCode: Int ,
contentSize: Long
)
object ApacheAccessLog{
// regex
// 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
val PARTTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
/**
*
* @param log
* @return
*/
def isValidateLogLine(log: String): Boolean = {
// parse log info
val res = PARTTERN.findFirstMatchIn(log)
// invalidate
if(res.isEmpty){
false
}else{
true
}
}
/**
*
* @param log
* @return
*/
def parseLogLine(log: String): ApacheAccessLog = {
// parse log info
val res = PARTTERN.findFirstMatchIn(log)
// invalidate
if(res.isEmpty){
throw new RuntimeException("Cannot parse log line: " + log)
}
// get value
val m = res.get
ApacheAccessLog(//
m.group(1),m.group(2),m.group(3),//
m.group(4),m.group(5),m.group(6),//
m.group(7),//
m.group(8).toInt, //
m.group(9).toLong)
}
}
3、自定義排序
package com.ibeifeng.bigdata.spark.app.core
/**
* Created by XuanYu on 2016/7/11.
*/
object OrderingUtils {
object SecondValueOrdering extends scala.Ordering[(String, Int)]{
/**
*
* @param x
* @param y
* @return
*/
override def compare(x: (String, Int), y: (String, Int)): Int = {
x._2.compare(y._2)
}
}
}
4、SparkSQL測試案例分析
package com.ibeifeng.bigdata.spark.app.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by q on 2016/7/17.
*/
object SQLApplication {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("SQLApplication")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
/**
* ==========================================================================================
*/
//create DataFrame
val df = sqlContext.read.load("/user/ibeifeng/sparklogexample/users.parquet")
df.show()
/**
* ==========================================================================================
*/
sc.stop()
}
}
相關推薦
大資料學習筆記(spark日誌分析案例)
前提:500w條記錄環境下(可以更多,視計算機效能而定),統計每天最熱門的top3板塊。 1、PV和UV 我們要統計的是最熱門的top3板塊,而熱門如果只是簡單地通過頁面瀏覽量(PV)或者使用者瀏覽量(UV)來決定都顯得比較片面,這裡我們綜合這兩者(0.3PV+
Spark日誌分析案例
SparkCore日誌分析主程式 package com.ibeifeng.bigdata.spark.app.core import org.apache.spark.{SparkContext, SparkConf} /** * Created b
Spark日誌分析項目Demo(9)--常規性能調優
array ack 不一定 集合類型 -s 如果 一次 puts cluster 一 分配更多資源 分配更多資源:性能調優的王道,就是增加和分配更多的資源,性能和速度上的提升,是顯而易見的;基本上,在一定範圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark
大資料 hive 15--hive日誌分析案例
1.1 專案來源 本次實踐的目的就在於通過對該技術論壇網站的tomcat access log日誌進行分析,計算該論壇的一些關鍵指標,供運營者進行決策時參考。 PS:開發該系統的目的是為了獲取一些業務相關的指標,這些指標在第三方工具中無法獲得的; 1.2 資料情況 該論壇資料有兩部分
Spark日誌分析專案Demo(4)--RDD使用,使用者行為統計分析
先說說需求,日誌挖掘 (1)隨機抽取100個session,統計時長(session時間),步長(session訪問頁面個數) (2)統計top10熱門品類 (3)統計top10熱門品類上的top10使用者 下面介紹通過日誌分析使用者行為流程 (1)某
spark 日誌分析
spark 執行日誌分析 1.問題:一直在執行的spark 執行時,發現數據應該690多萬,而只有610多萬,控制檯日誌正常。異常出現異常時,在控制檯中日誌正常。怎樣查詢這個錯誤異常日誌 2.處理:由於資料量比較大計算一次需要30多分鐘,因此在執行採用二分進行原因分析,最後
spark SQL學習(綜合案例-日誌分析)
日誌分析 scala> import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.Row scala> val logRDD = sc.textFile("hdfs://
使用Spark進行搜狗日誌分析實例——統計每個小時的搜索量
360安全衛士 返回 用戶 sogo user 順序 contex 讀取文件 key 1 package sogolog 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkCo
使用Spark進行搜狗日誌分析實例——列出搜索不同關鍵詞超過10個的用戶及其搜索的關鍵詞
log collect pre form 用戶 path space img ack 1 package sogolog 2 3 import org.apache.hadoop.io.{LongWritable, Text} 4 import org.apac
Spark學習筆記(19)—— 遊戲日誌分析
1 資料 0 管理員登入 1 首次登入 2 上線 3 下線 1|2016年2月1日,星期一,10:01:08|10.51.4.168|李明剋星|法師|男|1|0|0/800000000 1|2016年2月1日,星期一,10:01:12|10.117.45.20|風道|道士|男
大資料之電話日誌分析callLog案例(四)
一、修改kafka資料在主題中的貯存時間,預設是7天 ------------------------------------------------- [kafka/conf/server.properties] log.retention.hours=1 二、使用hive進行聚
大資料之電話日誌分析callLog案例(三)
一、查詢使用者最近的通話資訊 -------------------------------------------- 1.實現分析 使用ssm可視介面提供查詢串 -- controller連線 hiveserver2 -- 將命令轉化成hsql語句 -- hive繫結hba
大資料之電話日誌分析callLog案例(二)
一、前端實現 -- 按照時間段查詢通話記錄 ----------------------------------------- 1.完善calllog的dao類calllog.class ----------------------------------------------
使用Spark進行搜狗日誌分析例項——列出搜尋不同關鍵詞超過10個的使用者及其搜尋的關鍵詞
1 package sogolog 2 3 import org.apache.hadoop.io.{LongWritable, Text} 4 import org.apache.hadoop.mapred.TextInputFormat 5 import org.apache.spark
以慕課網日誌分析為例 進入大資料 Spark SQL 的世界
第1章 初探大資料本章將介紹為什麼要學習大資料、如何學好大資料、如何快速轉型大資料崗位、本專案實戰課程的內容安排、本專案實戰課程的前置內容介紹、開發環境介紹。同時為大家介紹專案中涉及的Hadoop、Hive相關的知識1-1 導學1-2 -如何學好大資料1-3 -開發環境介紹1-4 -OOTB映象檔案使用介紹1
Spark 日誌錯誤資訊分析及解決方案:log4j、SLF4j
Spark 日誌錯誤資訊 異常資訊:( 解決了好久的問題 ) 1、log4j錯誤類「org.apache.log4j.Appender」被載入,「org.apache.log4j.ConsoleAppender」不能分配給「org.apache.log4j.
Spark專案學習-慕課網日誌分析-days5-Spark on Yarn
1. 概述 (1) 在Spark中,支援4種執行模式: 1)local:開發時使用 2)standalone:是Spark自帶的,如果一個叢集是Standalone的話,那就需要在多臺機器上同時部署Spa
Spark專案學習-慕課網日誌分析-days4-慕課網日誌分析
一 慕課網日誌分析實戰專案 1)使用者行為日誌概述 2)離線資料處理架構(資料如何採集,如何清洗,需求處理,寫入資料庫,視覺化) 3)專案需求 4)功能實現  
Spark專案學習-慕課網日誌分析-days2-Spark SQL
1.Spark SQL 概述 (1)為什麼需要SQL 1)事實上的標準 2)簡單易學 Hive:類似於sql的Hive QL語言 sql==>mapreduce 特點:基於mapreduce 改進:基於tez spar
Spark專案學習-慕課網日誌分析-days3-External Data Source 外部資料來源
1. External Data Source 外部資料來源 1)每一個spark程式以載入資料開始,以輸出資料結束 2)方便快速的從不同的資料來源(json、parquet/rdbms),經過混合處理,在將處理結果以特定的格式,寫回到