1. 程式人生 > 其它 >spark和flink中計算topN的方法

spark和flink中計算topN的方法

一、SPARK

  其中top運算元呼叫的takeOrdered運算元,takeOrdered運算元底層使用的是優先佇列(BoundedPriorityQueue),首先進入的是mapPatition,然後使用reduce將每個分割槽資料進行合併

  • sortBy + take
    val url: URL = Launcher.getClass.getClassLoader.getResource("word.dat")
    val lines: RDD[String] = sc.textFile(url.getPath)
    lines
       .flatMap(_.split("\\s+"))
       .map(_ 
    -> 1) .reduceByKey(_ + _) .sortBy(_._2, false) .take(3) .foreach(println)
  • top
    val url: URL = Launcher.getClass.getClassLoader.getResource("word.dat")
    val lines: RDD[String] = sc.textFile(url.getPath)
    lines
       .flatMap(_.split("\\s+"))
       .map(_ -> 1)
       .reduceByKey(_ + _)
       .top(
    3)(Ordering.by(o => o._2)) .foreach(println)
  • takeOrdered
    val url: URL = Launcher.getClass.getClassLoader.getResource("word.dat")
    val lines: RDD[String] = sc.textFile(url.getPath)
    lines
       .flatMap(_.split("\\s+"))
       .map(_ -> 1)
       .reduceByKey(_ + _)
       .takeOrdered(3)(Ordering.by(o => -o._2))
       .foreach(println)

二、FLINK

  1)使用狀態計算

  2)TreeMap

  3)小頂堆