Spark-分組TOPN演算法
阿新 • • 發佈:2019-01-10
該資料集都為:“http://bigdata.edu360.cn/laozhou” 這個樣子,需求是找到每個學科下最受歡迎的老師
方法一:
/** * 資料放到scala 集合裡面進行操作 */ object GroupFavTeacher_1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FavTeacher").setMaster("local") val sc = new SparkContext(conf) //指定以後從哪裡讀取資料 val lines = sc.textFile(args(0)) //整理資料 val subject_teacherAndOne = lines.map(line => { //val line = "http://bigdata.edu360.cn/laoyu" val conSubject = line.split("/")(2) val subject =conSubject.split("[.]")(0) val teacher = line.split("/")(3) ((subject, teacher),1) }) //聚合,將學科和老師聯合當做key val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_) //分組排序(按學科進行分組) val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) //經過分組後,一個分割槽內可能有多個學科的資料,一個學科就是一個迭代器 //將每一個組拿出來進行操作v //為什麼可以呼叫sacla的sortby方法呢?因為一個學科的資料已經在一個scala集合裡面了 val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(3)) val resulted = sorted.collect() //收集 println(resulted.toBuffer) sc.stop() } }
- 方法二:
/** * 先過濾再統計計算 */ object GroupFavTeacher_2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("GroupFavTeacher_2").setMaster("local") val sc = new SparkContext(conf) //val topN = args(1).toInt val subjects = Array("bigdata", "javaee", "php") //指定以後從哪裡讀取資料 val lines = sc.textFile(args(0)) //整理資料 val subject_teacherAndOne = lines.map(line => { //val line = "http://bigdata.edu360.cn/laozhang" val conSubject = line.split("/")(2) val subject =conSubject.split("[.]")(0) val teacher = line.split("/")(3) ((subject, teacher),1) }) //聚合,將學科和老師聯合當做key val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_) //分組排序(按學科進行分組) val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) //scala的集合排序是在記憶體中進行的,但是記憶體有可能不夠用 //可以呼叫RDD的sortby方法,記憶體+磁碟進行排序 for(sb <- subjects) { //該RDD中對應的資料僅有一個學科的資料(因為過濾過了) val filted = grouped.filter(_._1 == sb) //現在呼叫的是RDD的sortBy方法,(take是一個action,會觸發任務提交) val filtedResulted = filted.sortBy(_._2, false).take(3) println(filtedResulted.toBuffer) } sc.stop() } }
- 方法三:
/** *自定義分割槽器(k,v) * / object GroupFavTeacher3 { def main(args: Array[String]): Unit = { val topN = args(1).toInt val conf = new SparkConf().setAppName("GroupFavTeacher2").setMaster("local[4]") val sc = new SparkContext(conf) //指定以後從哪裡讀取資料 val lines: RDD[String] = sc.textFile(args(0)) //整理資料 val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //聚合,將學科和老師聯合當做key val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_) //計算有多少學科 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() //自定義一個分割槽器,並且按照指定的分割槽器進行分割槽 val sbPatitioner = new SubjectParitioner(subjects); //partitionBy按照指定的分割槽規則進行分割槽 //呼叫partitionBy時RDD的Key是(String, String) val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner) //一次拿出一個分割槽(可以操作一個分割槽中的資料了) val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => { //將迭代器轉換成list,然後排序,在轉換成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator }) // val r: Array[((String, String), Int)] = sorted.collect() println(r.toBuffer) sc.stop() } } //自定義分割槽器 //思想就是把每一種給一個編號,每一個編號下的分割槽都是該學科的資料 class SubjectParitioner(sbs: Array[String]) extends Partitioner { //相當於主構造器(new的時候會執行一次) //用於存放規則的一個map val rules = new mutable.HashMap[String, Int]() var i = 0 for(sb <- sbs) { //rules(sb) = i rules.put(sb, i) i += 1 } //返回分割槽的數量(下一個RDD有多少分割槽) override def numPartitions: Int = sbs.length //根據傳入的key計算分割槽標號 //key是一個元組(String, String) override def getPartition(key: Any): Int = { //獲取學科名稱 val subject = key.asInstanceOf[(String, String)]._1 //根據規則計算分割槽編號 rules(subject) } }