1. 程式人生 > >scala spark-core 綜合練習

scala spark-core 綜合練習

package day01

import scala.util.matching.Regex
/**
  * 提供一些操作Apache Log的工具類供SparkCore使用
  */
case class ApacheAccessLog(
                             ipAddress: String, // IP地址
                             clientId: String, // 客戶端唯一識別符號
                             userId: String, // 使用者唯一識別符號
                             serverTime: String, // 伺服器時間
                             method: String, // 請求型別/方式
                             endpoint: String, // 請求的資源
                             protocol: String, // 請求的協議名稱
                             responseCode: Int,// 請求返回值:比如:200、401
                             contentSize: Long // 返回的結果資料大小

                          )

object ApacheAccessLog {
  // Apache日誌的正則
  val PARTTERN: Regex =
    """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  /**
    * 驗證一下輸入的資料是否符合給定的日誌正則,如果符合返回true;否則返回false
    *
    * @param line
    * @return
    */
  def isValidateLogLine(line: String): Boolean = {
    val options = PARTTERN.findFirstMatchIn(line)

    if (options.isEmpty) {
      false
    } else {
      true
    }
  }

  /**
    * 解析輸入的日誌資料
    *
    * @param line
    * @return
    */
  def parseLogLine(line: String): ApacheAccessLog = {
    if (!isValidateLogLine(line)) {
      throw new IllegalArgumentException("引數格式異常")
    }

    // 從line中獲取匹配的資料
    val options = PARTTERN.findFirstMatchIn(line)

    // 獲取matcher
    val matcher = options.get

    // 構建返回值
    ApacheAccessLog(
      matcher.group(1), // 獲取匹配字串中第一個小括號中的值
      matcher.group(2),
      matcher.group(3),
      matcher.group(4),
      matcher.group(5),
      matcher.group(6),
      matcher.group(7),
      matcher.group(8).toInt,
      matcher.group(9).toLong
    )
  }
}
package day01

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 1:求返回結果資料的平均值、最大值、最小值
  * 2:統計各個返回值出現的資料次數
  * 3:獲取訪問次數超過N次的IP地址
  * 增加難度:對IP地址進行限制,黑名單的IP地址不做統計
  * 4:獲取訪問次數最多的前N個資源=》TopN操作
  */
object Access_log {
//110.75.173.48 - - [30/May/2013:23:59:58 +0800] "GET /thread-36410-1-9.html HTTP/1.1" 200 68629
def main(args: Array[String]): Unit = {
  val topN= 2000
  val topK =3
  val conf = new SparkConf().setAppName("Access_log").setMaster("local[*]")
  val sc =new SparkContext(conf)

  //讀取資料
  val lines: RDD[String] = sc.textFile(args(0))
  //對資料進行匹配
  val logsRDD: RDD[ApacheAccessLog] = lines.filter(line =>ApacheAccessLog.isValidateLogLine(line)).map(line=>{
    ApacheAccessLog.parseLogLine(line)
  })
    //因為要對資料進行多次處理,所有對資料進行優化
  logsRDD.cache()//將資料儲存到記憶體中
  /**
    * 1:求返回結果資料的平均值、最大值、最小值
    */
  val contentSizeRdd: RDD[Long] = logsRDD.map(line=>(line.contentSize))
  contentSizeRdd.cache()
  val contentSizeMax: Long = contentSizeRdd.max()
  val contentSizeMin: Long = contentSizeRdd.min()
  val contentSizeSum: Double = contentSizeRdd.sum()
  val contentSizeCount: Long = contentSizeRdd.count()
  val age: Double =1.0 *contentSizeSum /contentSizeCount

  println(s"平均值: $age, 最大值:$contentSizeMax, 最小值:$contentSizeMin")
  //釋放記憶體中的資源
  contentSizeRdd.unpersist(true)

  /**
    * 2:統計各個返回值出現的資料次數
    */
  val responseCodeRdd: RDD[(Int, Int)] = logsRDD.map(line=>(line.responseCode,1)).reduceByKey(_+_)
  println("每個狀態返回值得次數:"+"\t\n"+s"${responseCodeRdd.collect().mkString("\t\n")}")

  /**
    * 3:獲取訪問次數超過N次的IP地址
    *  增加難度:對IP地址進行限制,黑名單的IP地址不做統計
    */

    val blackIPs=Array("110.75.173.48", "220.181.89.186")
    //定義廣播變數,這裡方便點,就寫死了
    val broadcasted: Broadcast[Array[String]] = sc.broadcast(blackIPs)
  //對ip地址進行黑名單過濾
  val ipAddressRdd: RDD[(String, Int)] = logsRDD.filter(log => (!broadcasted.value.contains(log.ipAddress))).map(line => (line.ipAddress, 1))
    .reduceByKey(_ + _).filter(tp => tp._2 > topN)
    println(s"ipAddress :${ipAddressRdd.collect().mkString(", ")}")

  /**
    * 獲取訪問次數最多的前N個資源=》TopN操作
    */
  val endpointRDD: Array[(String, Int)] = logsRDD.map(line=>(line.endpoint,1)).reduceByKey(_+_).top(topK)(TupleOrdering)
  println(s"endpoint : ${endpointRDD.mkString(",")}")

  //釋放記憶體
  logsRDD.unpersist(true)
  //關閉資源
  sc.stop()
}
}

//自定義排序規則
object TupleOrdering extends Ordering[(String,Int)]{
  override def compare(x: (String, Int), y: (String, Int)): Int = {
    x._2.compare(y._2)
  }
}