1. 程式人生 > >Spark日誌分析案例

Spark日誌分析案例

  1. SparkCore日誌分析主程式
package com.ibeifeng.bigdata.spark.app.core
 import org.apache.spark.{SparkContext, SparkConf}
/**
 * Created by XuanYu on 2016/7/11.
 */
object LogAnalyzer {
  def main(args: Array[String]) {
    // step 1: Create SparkConf Object
    val conf = new SparkConf()
      .setAppName("LogAnalyzer Application"
) .setMaster("local[2]") // step 2: Create SparkContext Object val sc = new SparkContext(conf) /** ====================================================================== */ val logFile = "file:///D:/access_log"//"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/apache.access.log" // 1: input val
accessLogs = sc.textFile(logFile) /** *filter log data */ .filter(ApacheAccessLog.isValidateLogLine) /** parse log */ .map(line => ApacheAccessLog.parseLogLine(line)) /** *cache ,如果某個RDD反覆的被使用,可以考慮將其進行cache */ accessLogs.cache() println("Access Logs Count : "
+ accessLogs.count()) // ===================================================================== /** *The average, min, and max content size of responses returned from the server. */ val contentSiezes = accessLogs.map(log => log.contentSize) // RDD[String] /** *cache contentSizes */ contentSiezes.cache() // compute val avgContentSize = contentSiezes.reduce(_ + _) / contentSiezes.count() val minContentSize = contentSiezes.min() val maxContextSize = contentSiezes.max() // unpersist contentSiezes.unpersist() println("Content Size Avg: %s, Min: %s , Max: %s".format( avgContentSize, minContentSize, maxContextSize )) // ===================================================================== /** A count of response code's returned. */ val responseCodeToCount = accessLogs // .map(log => (log.responseCode, 1)) // .reduceByKey(_ + _) // .take(5) println(s"""Response Code Count: ${responseCodeToCount.mkString("[",",","]")}""") // ===================================================================== /** *All IPAddresses that have accessed this server more than N times. */ val ipAddresses = accessLogs // .map(log => (log.ipAddress, 1)) // .reduceByKey(_ + _) // // .filter(tuple => (tuple._2 > 10 )) // .map(tuple => tuple._1) // .take(3) println(s"""IPAddress : \${ipAddresses.mkString("[",",","]")}""") // ===================================================================== /** *The top endpoints requested by count. */ val topEndpoints = accessLogs // .map(log => (log.endPoint, 1)) // .reduceByKey(_ + _) // top // def top(num : scala.Int)(implicit ord : scala.Ordering[T]) .top(3)(OrderingUtils.SecondValueOrdering) /** // .map(tuple => (tuple._2, tuple._1)) .sortByKey(false) .take(3) .map(tuple => (tuple._2, tuple._1)) */ println(s"""Top Endpoints : ${topEndpoints.mkString("[",",","]")}""") // unpersist accessLogs.unpersist() /** ====================================================================== */ // step 3: Stop SparkContext sc.stop() } }

2、日誌分析資料清洗類

package com.ibeifeng.bigdata.spark.app.core

/**
 * Created by XuanYu on 2016/7/11.
 */
case class ApacheAccessLog(
  ipAddress: String ,
  clientIdentd: String ,
  userId: String ,
  dataTime: String ,
  method: String ,
  endPoint: String ,
  protocol: String ,
  responseCode: Int ,
  contentSize: Long
)

object ApacheAccessLog{

  // regex
  // 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
  val PARTTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  /**
   *
   * @param log
   * @return
   */
  def isValidateLogLine(log: String): Boolean = {
    // parse log info
    val res = PARTTERN.findFirstMatchIn(log)

    // invalidate
    if(res.isEmpty){
      false
    }else{
      true
    }
  }
  /**
   *
   * @param log
   * @return
   */
  def parseLogLine(log: String): ApacheAccessLog = {
    // parse log info
    val res = PARTTERN.findFirstMatchIn(log)

    // invalidate
    if(res.isEmpty){
      throw new RuntimeException("Cannot parse log line: " + log)
    }

    // get value
    val m = res.get

    ApacheAccessLog(//
      m.group(1),m.group(2),m.group(3),//
      m.group(4),m.group(5),m.group(6),//
      m.group(7),//
      m.group(8).toInt, //
      m.group(9).toLong)
  }
}

3、自定義排序

package com.ibeifeng.bigdata.spark.app.core
/**
 * Created by XuanYu on 2016/7/11.
 */
object OrderingUtils {
  object SecondValueOrdering extends scala.Ordering[(String, Int)]{
    /**
     *
     * @param x
     * @param y
     * @return
     */
    override def compare(x: (String, Int), y: (String, Int)): Int = {
      x._2.compare(y._2)
    }
  }
}

4、SparkSQL測試案例分析

package com.ibeifeng.bigdata.spark.app.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
 * Created by q on 2016/7/17.
 */
object  SQLApplication {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("SQLApplication")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    // this is used to implicitly convert an RDD to a DataFrame.
    /**
     * ==========================================================================================
     */
    //create DataFrame
    val df = sqlContext.read.load("/user/ibeifeng/sparklogexample/users.parquet")
    df.show()
    /**
     * ==========================================================================================
     */
    sc.stop()
  }
}

相關推薦

大資料學習筆記(spark日誌分析案例)

前提:500w條記錄環境下(可以更多,視計算機效能而定),統計每天最熱門的top3板塊。 1、PV和UV 我們要統計的是最熱門的top3板塊,而熱門如果只是簡單地通過頁面瀏覽量(PV)或者使用者瀏覽量(UV)來決定都顯得比較片面,這裡我們綜合這兩者(0.3PV+

Spark日誌分析案例

SparkCore日誌分析主程式 package com.ibeifeng.bigdata.spark.app.core import org.apache.spark.{SparkContext, SparkConf} /** * Created b

Spark日誌分析項目Demo(9)--常規性能調優

array ack 不一定 集合類型 -s 如果 一次 puts cluster 一 分配更多資源 分配更多資源:性能調優的王道,就是增加和分配更多的資源,性能和速度上的提升,是顯而易見的;基本上,在一定範圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark

大資料 hive 15--hive日誌分析案例

1.1 專案來源 本次實踐的目的就在於通過對該技術論壇網站的tomcat access log日誌進行分析,計算該論壇的一些關鍵指標,供運營者進行決策時參考。 PS:開發該系統的目的是為了獲取一些業務相關的指標,這些指標在第三方工具中無法獲得的; 1.2 資料情況 該論壇資料有兩部分

Spark日誌分析專案Demo(4)--RDD使用,使用者行為統計分析

先說說需求,日誌挖掘 (1)隨機抽取100個session,統計時長(session時間),步長(session訪問頁面個數) (2)統計top10熱門品類 (3)統計top10熱門品類上的top10使用者 下面介紹通過日誌分析使用者行為流程 (1)某

spark 日誌分析

spark 執行日誌分析 1.問題:一直在執行的spark 執行時,發現數據應該690多萬,而只有610多萬,控制檯日誌正常。異常出現異常時,在控制檯中日誌正常。怎樣查詢這個錯誤異常日誌 2.處理:由於資料量比較大計算一次需要30多分鐘,因此在執行採用二分進行原因分析,最後

spark SQL學習(綜合案例-日誌分析

  日誌分析 scala> import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.Row scala> val logRDD = sc.textFile("hdfs://

使用Spark進行搜狗日誌分析實例——統計每個小時的搜索量

360安全衛士 返回 用戶 sogo user 順序 contex 讀取文件 key 1 package sogolog 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkCo

使用Spark進行搜狗日誌分析實例——列出搜索不同關鍵詞超過10個的用戶及其搜索的關鍵詞

log collect pre form 用戶 path space img ack 1 package sogolog 2 3 import org.apache.hadoop.io.{LongWritable, Text} 4 import org.apac

Spark學習筆記(19)—— 遊戲日誌分析

1 資料 0 管理員登入 1 首次登入 2 上線 3 下線 1|2016年2月1日,星期一,10:01:08|10.51.4.168|李明剋星|法師|男|1|0|0/800000000 1|2016年2月1日,星期一,10:01:12|10.117.45.20|風道|道士|男

大資料之電話日誌分析callLog案例(四)

一、修改kafka資料在主題中的貯存時間,預設是7天 ------------------------------------------------- [kafka/conf/server.properties] log.retention.hours=1 二、使用hive進行聚

大資料之電話日誌分析callLog案例(三)

一、查詢使用者最近的通話資訊 -------------------------------------------- 1.實現分析 使用ssm可視介面提供查詢串 -- controller連線 hiveserver2 -- 將命令轉化成hsql語句 -- hive繫結hba

大資料之電話日誌分析callLog案例(二)

一、前端實現 -- 按照時間段查詢通話記錄 ----------------------------------------- 1.完善calllog的dao類calllog.class ----------------------------------------------

使用Spark進行搜狗日誌分析例項——列出搜尋不同關鍵詞超過10個的使用者及其搜尋的關鍵詞

1 package sogolog 2 3 import org.apache.hadoop.io.{LongWritable, Text} 4 import org.apache.hadoop.mapred.TextInputFormat 5 import org.apache.spark

以慕課網日誌分析為例 進入大資料 Spark SQL 的世界

第1章 初探大資料本章將介紹為什麼要學習大資料、如何學好大資料、如何快速轉型大資料崗位、本專案實戰課程的內容安排、本專案實戰課程的前置內容介紹、開發環境介紹。同時為大家介紹專案中涉及的Hadoop、Hive相關的知識1-1 導學1-2 -如何學好大資料1-3 -開發環境介紹1-4 -OOTB映象檔案使用介紹1

Spark 日誌錯誤資訊分析及解決方案:log4j、SLF4j

Spark 日誌錯誤資訊 異常資訊:( 解決了好久的問題 ) 1、log4j錯誤類「org.apache.log4j.Appender」被載入,「org.apache.log4j.ConsoleAppender」不能分配給「org.apache.log4j.

Spark專案學習-慕課網日誌分析-days5-Spark on Yarn

1. 概述     (1) 在Spark中,支援4種執行模式:     1)local:開發時使用     2)standalone:是Spark自帶的,如果一個叢集是Standalone的話,那就需要在多臺機器上同時部署Spa

Spark專案學習-慕課網日誌分析-days4-慕課網日誌分析

一 慕課網日誌分析實戰專案     1)使用者行為日誌概述     2)離線資料處理架構(資料如何採集,如何清洗,需求處理,寫入資料庫,視覺化)     3)專案需求     4)功能實現  

Spark專案學習-慕課網日誌分析-days2-Spark SQL

1.Spark SQL 概述 (1)為什麼需要SQL 1)事實上的標準 2)簡單易學 Hive:類似於sql的Hive QL語言 sql==>mapreduce    特點:基於mapreduce    改進:基於tez spar

Spark專案學習-慕課網日誌分析-days3-External Data Source 外部資料來源

1. External Data Source 外部資料來源     1)每一個spark程式以載入資料開始,以輸出資料結束     2)方便快速的從不同的資料來源(json、parquet/rdbms),經過混合處理,在將處理結果以特定的格式,寫回到