spark求最受歡迎的老師的問題
阿新 • • 發佈:2018-12-26
檔案內容:
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi
1.求最受歡迎的老師,不考慮課程類別(然後類似於wordCount)
import java.net.URL import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FavTeacher { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]") val sc = new SparkContext(conf) val lines = sc.textFile("d:/data/teacher.log") val word: RDD[((String, String), Int)] = lines.map(line => { val teacher = line.substring(line.lastIndexOf("/")+1) val url = new URL(line).getHost val subject = url.substring(0,url.indexOf(".")) ((subject,teacher),1) }) val reduced = word.reduceByKey(_+_) val sorted = reduced.sortBy(_._2,false) val list = sorted.take(3) println(list.toBuffer) } } //執行結果 //ArrayBuffer(((bigdata,lisi),15), ((javaee,laoyang),9), ((javaee,zhaoliu),6))
2.求每個學科最受歡迎的老師
根據學科分組然後排序
import java.net.URL import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FavTeacher { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]") val sc = new SparkContext(conf) val lines = sc.textFile("d:/data/teacher.log") val word: RDD[((String, String), Int)] = lines.map(line => { val teacher = line.substring(line.lastIndexOf("/")+1) val url = new URL(line).getHost val subject = url.substring(0,url.indexOf(".")) ((subject,teacher),1) }) val reduced = word.reduceByKey(_+_) // val sorted = reduced.sortBy(_._2,false) //分組 val grouped = reduced.groupBy(_._1._1) //排序 取前兩名 取到的資料是scala中進行排序的 //先分組 然後在組內進行排序 這裡的ComoactBuffer是迭代器,繼承了序列,然後迭代器轉換成List進行排序 //在某種極端情況下,_表示迭代分割槽的資料,證明這裡是將迭代器的資料一次性的來過來後進行toList,如果資料量非常大,這裡肯定會出現OOM(記憶體溢位) val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2)) //釋放資源 sc.stop() } } //執行結果 // (javaee,List(((javaee,laoyang),9), ((javaee,zhaoliu),6))) // (python,List(((python,laoli),3), ((python,laoliu),1))) // (bigdata,List(((bigdata,lisi),15), ((bigdata,wangwu),6)))
3.求各科最受歡迎的兩名老師
建立一個數組 將不同的學科放在不同的RDD中 然後排序,取值
import java.net.URL import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD /** * 根據學科取得的最受歡迎的前2名老師的排序 */ object FavTeacher2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val subjects = Array("javaee","bigdata","python") val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]") val sc = new SparkContext(conf) val lines = sc.textFile("d:/data/teacher.log") //處理資料 val word: RDD[((String, String), Int)] = lines.map(line => { val teacher = line.substring(line.lastIndexOf("/")+1) val url = new URL(line).getHost val subject = url.substring(0,url.indexOf(".")) ((subject,teacher),1) }) //聚合 val reduced = word.reduceByKey(_+_) // val sorted = reduced.sortBy(_._2,false) //分組 // val grouped = reduced.groupBy(_._1._1) //先將學科進行過濾,一個學科的資料放到一個RDD中 for(sb <- subjects){ //對所有資料進行過濾 val filtered = reduced.filter(_._1._1 == sb) //在一個學科中進行排序(RDD排序是記憶體+磁碟) val sorted = filtered.sortBy(_._2,false).take(2) println(sorted.toBuffer) } } } //執行結果 ArrayBuffer(((javaee,laoyang),9), ((javaee,zhaoliu),6)) ArrayBuffer(((bigdata,lisi),15), ((bigdata,wangwu),6)) ArrayBuffer(((python,laoli),3), ((python,laoliu),1))
4.求各科最受歡迎的兩名老師
自定義分割槽器 將相同科目的老師放到同一個分割槽
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object FavTeacher3 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("d:/data/teacher.log")
//處理資料
val word: RDD[((String, String), Int)] = lines.map(line => {
val teacher = line.substring(line.lastIndexOf("/")+1)
val url = new URL(line).getHost
val subject = url.substring(0,url.indexOf("."))
((subject,teacher),1)
})
//聚合
val reduced = word.reduceByKey(_+_)
//先計算學科的數量
//將所有學科的名字先在叢集中統計計算,然後收集回來(計算有幾個學科 建立幾個分割槽)
val subject: Array[String] = reduced.map(_._1._1).distinct().collect()
//建立一個自定義分割槽器,按照學科進行分割槽, 相同學科的資料都shuffle到一個分割槽
val subjectPartitiioner = new SubjectPartitioner(subject)
//對聚合後的RDD進行自定義分割槽
val sbPartitioner = reduced.partitionBy(subjectPartitiioner)
//重新分割槽後,在每個分割槽中進行排序
val sorted =
sbPartitioner.mapPartitions(_.toList.sortBy(- _._2).iterator)
sorted.saveAsTextFile("d:/data/out/teacher")
}
}
//自定義分割槽器
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
//在new的時候執行,在構造器中執行
//String是分割槽(學科),Int 是學科的位置
val rules = new mutable.HashMap[String,Int]()
var index = 0
//初始化一個規則
for(sb <- subjects){
rules += ((sb,index))
index += 1
}
//有幾個學科返回幾個區
//返回分割槽的數量
override def numPartitions: Int = subjects.length
//根據傳入的key,計算返回分割槽的編號
//定義一個 計算規則
override def getPartition(key: Any): Int = {
//key是一個元組(學科,老師) 將key強制轉換成元組
val tuple = key.asInstanceOf[(String,String)]
val subject = tuple._1
rules(subject)
}