1. 程式人生 > >Spark-分組TOPN演算法

Spark-分組TOPN演算法

該資料集都為:“http://bigdata.edu360.cn/laozhou” 這個樣子,需求是找到每個學科下最受歡迎的老師
方法一:

/**
  * 資料放到scala 集合裡面進行操作
  */
object GroupFavTeacher_1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local")
    val sc = new SparkContext(conf)
    //指定以後從哪裡讀取資料
    val lines = sc.textFile(args(0))
    //整理資料
    val subject_teacherAndOne = lines.map(line => {
      //val line = "http://bigdata.edu360.cn/laoyu"
      val conSubject = line.split("/")(2)
      val subject =conSubject.split("[.]")(0)
      val teacher = line.split("/")(3)
      ((subject, teacher),1)
    })
    //聚合,將學科和老師聯合當做key
    val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_)
    //分組排序(按學科進行分組)
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
    //經過分組後,一個分割槽內可能有多個學科的資料,一個學科就是一個迭代器
    //將每一個組拿出來進行操作v
    //為什麼可以呼叫sacla的sortby方法呢?因為一個學科的資料已經在一個scala集合裡面了
    val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(3))
    val resulted = sorted.collect()
    //收集
    println(resulted.toBuffer)
    sc.stop()
  }
}

  • 方法二:
/**
  * 先過濾再統計計算
  */
object GroupFavTeacher_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupFavTeacher_2").setMaster("local")
    val sc = new SparkContext(conf)
    //val topN = args(1).toInt
    val subjects = Array("bigdata", "javaee", "php")
    //指定以後從哪裡讀取資料
    val lines = sc.textFile(args(0))
    //整理資料
    val subject_teacherAndOne = lines.map(line => {
      //val line = "http://bigdata.edu360.cn/laozhang"
      val conSubject = line.split("/")(2)
      val subject =conSubject.split("[.]")(0)
      val teacher = line.split("/")(3)
      ((subject, teacher),1)
    })
    //聚合,將學科和老師聯合當做key
    val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_)
    //分組排序(按學科進行分組)
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)

    //scala的集合排序是在記憶體中進行的,但是記憶體有可能不夠用
    //可以呼叫RDD的sortby方法,記憶體+磁碟進行排序
    for(sb <- subjects) {
      //該RDD中對應的資料僅有一個學科的資料(因為過濾過了)
      val filted = grouped.filter(_._1 == sb)
      //現在呼叫的是RDD的sortBy方法,(take是一個action,會觸發任務提交)
      val filtedResulted = filted.sortBy(_._2, false).take(3)
      println(filtedResulted.toBuffer)
    }
    sc.stop()
  }
}
  • 方法三:
/**
*自定義分割槽器(k,v)
*
/
object GroupFavTeacher3 {

  def main(args: Array[String]): Unit = {

    val topN = args(1).toInt

    val conf = new SparkConf().setAppName("GroupFavTeacher2").setMaster("local[4]")
    val sc = new SparkContext(conf)

    //指定以後從哪裡讀取資料
    val lines: RDD[String] = sc.textFile(args(0))
    //整理資料
    val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      val teacher = line.substring(index + 1)
      val httpHost = line.substring(0, index)
      val subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })

    //聚合,將學科和老師聯合當做key
    val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)

    //計算有多少學科
    val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()

    //自定義一個分割槽器,並且按照指定的分割槽器進行分割槽
    val sbPatitioner = new SubjectParitioner(subjects);

    //partitionBy按照指定的分割槽規則進行分割槽
    //呼叫partitionBy時RDD的Key是(String, String)
    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)

    //一次拿出一個分割槽(可以操作一個分割槽中的資料了)
    val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
      //將迭代器轉換成list,然後排序,在轉換成迭代器返回
      it.toList.sortBy(_._2).reverse.take(topN).iterator
    })

    //
    val r: Array[((String, String), Int)] = sorted.collect()
    println(r.toBuffer)
    sc.stop()
  }
}

//自定義分割槽器
//思想就是把每一種給一個編號,每一個編號下的分割槽都是該學科的資料
class SubjectParitioner(sbs: Array[String]) extends Partitioner {

  //相當於主構造器(new的時候會執行一次)
  //用於存放規則的一個map
  val rules = new mutable.HashMap[String, Int]()
  var i = 0
  for(sb <- sbs) {
    //rules(sb) = i
    rules.put(sb, i)
    i += 1
  }

  //返回分割槽的數量(下一個RDD有多少分割槽)
  override def numPartitions: Int = sbs.length

  //根據傳入的key計算分割槽標號
  //key是一個元組(String, String)
  override def getPartition(key: Any): Int = {
    //獲取學科名稱
    val subject = key.asInstanceOf[(String, String)]._1
    //根據規則計算分割槽編號
    rules(subject)
  }
}