scala spark-core 綜合練習
阿新 • • 發佈:2018-12-17
package day01 import scala.util.matching.Regex /** * 提供一些操作Apache Log的工具類供SparkCore使用 */ case class ApacheAccessLog( ipAddress: String, // IP地址 clientId: String, // 客戶端唯一識別符號 userId: String, // 使用者唯一識別符號 serverTime: String, // 伺服器時間 method: String, // 請求型別/方式 endpoint: String, // 請求的資源 protocol: String, // 請求的協議名稱 responseCode: Int,// 請求返回值:比如:200、401 contentSize: Long // 返回的結果資料大小 ) object ApacheAccessLog { // Apache日誌的正則 val PARTTERN: Regex = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r /** * 驗證一下輸入的資料是否符合給定的日誌正則,如果符合返回true;否則返回false * * @param line * @return */ def isValidateLogLine(line: String): Boolean = { val options = PARTTERN.findFirstMatchIn(line) if (options.isEmpty) { false } else { true } } /** * 解析輸入的日誌資料 * * @param line * @return */ def parseLogLine(line: String): ApacheAccessLog = { if (!isValidateLogLine(line)) { throw new IllegalArgumentException("引數格式異常") } // 從line中獲取匹配的資料 val options = PARTTERN.findFirstMatchIn(line) // 獲取matcher val matcher = options.get // 構建返回值 ApacheAccessLog( matcher.group(1), // 獲取匹配字串中第一個小括號中的值 matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5), matcher.group(6), matcher.group(7), matcher.group(8).toInt, matcher.group(9).toLong ) } }
package day01 import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 1:求返回結果資料的平均值、最大值、最小值 * 2:統計各個返回值出現的資料次數 * 3:獲取訪問次數超過N次的IP地址 * 增加難度:對IP地址進行限制,黑名單的IP地址不做統計 * 4:獲取訪問次數最多的前N個資源=》TopN操作 */ object Access_log { //110.75.173.48 - - [30/May/2013:23:59:58 +0800] "GET /thread-36410-1-9.html HTTP/1.1" 200 68629 def main(args: Array[String]): Unit = { val topN= 2000 val topK =3 val conf = new SparkConf().setAppName("Access_log").setMaster("local[*]") val sc =new SparkContext(conf) //讀取資料 val lines: RDD[String] = sc.textFile(args(0)) //對資料進行匹配 val logsRDD: RDD[ApacheAccessLog] = lines.filter(line =>ApacheAccessLog.isValidateLogLine(line)).map(line=>{ ApacheAccessLog.parseLogLine(line) }) //因為要對資料進行多次處理,所有對資料進行優化 logsRDD.cache()//將資料儲存到記憶體中 /** * 1:求返回結果資料的平均值、最大值、最小值 */ val contentSizeRdd: RDD[Long] = logsRDD.map(line=>(line.contentSize)) contentSizeRdd.cache() val contentSizeMax: Long = contentSizeRdd.max() val contentSizeMin: Long = contentSizeRdd.min() val contentSizeSum: Double = contentSizeRdd.sum() val contentSizeCount: Long = contentSizeRdd.count() val age: Double =1.0 *contentSizeSum /contentSizeCount println(s"平均值: $age, 最大值:$contentSizeMax, 最小值:$contentSizeMin") //釋放記憶體中的資源 contentSizeRdd.unpersist(true) /** * 2:統計各個返回值出現的資料次數 */ val responseCodeRdd: RDD[(Int, Int)] = logsRDD.map(line=>(line.responseCode,1)).reduceByKey(_+_) println("每個狀態返回值得次數:"+"\t\n"+s"${responseCodeRdd.collect().mkString("\t\n")}") /** * 3:獲取訪問次數超過N次的IP地址 * 增加難度:對IP地址進行限制,黑名單的IP地址不做統計 */ val blackIPs=Array("110.75.173.48", "220.181.89.186") //定義廣播變數,這裡方便點,就寫死了 val broadcasted: Broadcast[Array[String]] = sc.broadcast(blackIPs) //對ip地址進行黑名單過濾 val ipAddressRdd: RDD[(String, Int)] = logsRDD.filter(log => (!broadcasted.value.contains(log.ipAddress))).map(line => (line.ipAddress, 1)) .reduceByKey(_ + _).filter(tp => tp._2 > topN) println(s"ipAddress :${ipAddressRdd.collect().mkString(", ")}") /** * 獲取訪問次數最多的前N個資源=》TopN操作 */ val endpointRDD: Array[(String, Int)] = logsRDD.map(line=>(line.endpoint,1)).reduceByKey(_+_).top(topK)(TupleOrdering) println(s"endpoint : ${endpointRDD.mkString(",")}") //釋放記憶體 logsRDD.unpersist(true) //關閉資源 sc.stop() } } //自定義排序規則 object TupleOrdering extends Ordering[(String,Int)]{ override def compare(x: (String, Int), y: (String, Int)): Int = { x._2.compare(y._2) } }