1. 程式人生 > >spark求最受歡迎的老師的問題

spark求最受歡迎的老師的問題

檔案內容:

http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi

1.求最受歡迎的老師,不考慮課程類別(然後類似於wordCount)

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacher {

 
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    val reduced = word.reduceByKey(_+_)

    val sorted = reduced.sortBy(_._2,false)
    val list = sorted.take(3)
    println(list.toBuffer)

  }
}



//執行結果
//ArrayBuffer(((bigdata,lisi),15), ((javaee,laoyang),9), ((javaee,zhaoliu),6))

2.求每個學科最受歡迎的老師

  根據學科分組然後排序

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacher {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    val reduced = word.reduceByKey(_+_)

   // val sorted = reduced.sortBy(_._2,false)
    //分組
    val grouped = reduced.groupBy(_._1._1)
    //排序 取前兩名 取到的資料是scala中進行排序的
    //先分組 然後在組內進行排序 這裡的ComoactBuffer是迭代器,繼承了序列,然後迭代器轉換成List進行排序
    //在某種極端情況下,_表示迭代分割槽的資料,證明這裡是將迭代器的資料一次性的來過來後進行toList,如果資料量非常大,這裡肯定會出現OOM(記憶體溢位)
    val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))



    //釋放資源
    sc.stop()
  }
}

//執行結果

//  (javaee,List(((javaee,laoyang),9), ((javaee,zhaoliu),6)))
//  (python,List(((python,laoli),3), ((python,laoliu),1)))
//  (bigdata,List(((bigdata,lisi),15), ((bigdata,wangwu),6)))

3.求各科最受歡迎的兩名老師

建立一個數組  將不同的學科放在不同的RDD中 然後排序,取值

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * 根據學科取得的最受歡迎的前2名老師的排序
  */
object FavTeacher2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val subjects = Array("javaee","bigdata","python")
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    //處理資料
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    //聚合
    val reduced = word.reduceByKey(_+_)

    // val sorted = reduced.sortBy(_._2,false)
    //分組
    // val grouped = reduced.groupBy(_._1._1)
    //先將學科進行過濾,一個學科的資料放到一個RDD中
    for(sb <- subjects){
      //對所有資料進行過濾
      val filtered = reduced.filter(_._1._1 == sb)
      //在一個學科中進行排序(RDD排序是記憶體+磁碟)
      val sorted = filtered.sortBy(_._2,false).take(2)
      println(sorted.toBuffer)
    }

  }
}

//執行結果
ArrayBuffer(((javaee,laoyang),9), ((javaee,zhaoliu),6))
ArrayBuffer(((bigdata,lisi),15), ((bigdata,wangwu),6))
ArrayBuffer(((python,laoli),3), ((python,laoliu),1))

4.求各科最受歡迎的兩名老師

  自定義分割槽器 將相同科目的老師放到同一個分割槽

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object FavTeacher3 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    //處理資料
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    //聚合
    val reduced = word.reduceByKey(_+_)

    //先計算學科的數量
    //將所有學科的名字先在叢集中統計計算,然後收集回來(計算有幾個學科 建立幾個分割槽)
    val subject: Array[String] = reduced.map(_._1._1).distinct().collect()

    //建立一個自定義分割槽器,按照學科進行分割槽, 相同學科的資料都shuffle到一個分割槽
    val subjectPartitiioner = new SubjectPartitioner(subject)

    //對聚合後的RDD進行自定義分割槽
    val sbPartitioner = reduced.partitionBy(subjectPartitiioner)
    //重新分割槽後,在每個分割槽中進行排序
    val sorted =
    sbPartitioner.mapPartitions(_.toList.sortBy(- _._2).iterator)
    sorted.saveAsTextFile("d:/data/out/teacher")
  }
}

//自定義分割槽器
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
  //在new的時候執行,在構造器中執行
  //String是分割槽(學科),Int 是學科的位置
  val rules = new mutable.HashMap[String,Int]()

  var index = 0
  //初始化一個規則
  for(sb <- subjects){
    rules += ((sb,index))
    index += 1
  }
  //有幾個學科返回幾個區
  //返回分割槽的數量
  override def numPartitions: Int = subjects.length
  //根據傳入的key,計算返回分割槽的編號
  //定義一個 計算規則
  override def getPartition(key: Any): Int = {
    //key是一個元組(學科,老師) 將key強制轉換成元組
    val tuple = key.asInstanceOf[(String,String)]
    val subject = tuple._1
    rules(subject)
  }