1. 程式人生 > >Spark Core 和 Spark SQL 實現分組取Top N(基於scala)

Spark Core 和 Spark SQL 實現分組取Top N(基於scala)

分組取Top N在日常需求中很多見:

  1. 每個班級分數前三名同學的名字以及分數
  2. 各省指標數量前三的市的名字 

等等需求,主要思想就是在某一個分割槽(班級,省)中取出該分割槽Top N的資料

測試資料格式:


如上圖,欄位含義為,班級,學生姓名,分數

下面我們通過一個Demo來實現各班級分數前三的學生姓名以及分數

1、通過Spark core 實現:

//讀取測試資料儲存為rdd

val rddtext = sc.textFile("file:///C:/Users/chunyuhe/Desktop/test1.txt")

//將資料轉化為Row形式(為下面Spark SQL 生成臨時表用)

val rowrdd = rddtext.map(m => Row(m.split(" ")(0), m.split(" ")(1), m.split(" ")(2).toInt))
/**
* spark core 實現分組取topN
*/

val classrdd = rddtext.map(x => {

                       //取到各資料並賦值給變數

val classname = x.split(" ")(0)
val name = x.split(" ")(1)

val grade = x.split(" ")(2)

                       //生成一個便於計算的元組

(classname, (name, grade.toInt))

}).groupByKey

                //根據key聚合分組得到


classrdd.foreach(x => println(x))
classrdd.map(m => {

val classname = m._1

                       //如上圖將各班級同學資訊轉化為Array陣列並且安裝成績進行降序排列取前三

                        val top3 = m._2.toArray.sortWith(_._2 > _._2).take(3)

(classname, top3)
}).foreach(m => {
println(m._1 + "班級的前3名的成績為")
m._2.foreach(x => {
println(x)
})

})

輸出結果為:


2、通過Spark sql 實現:

//隱式轉換

import spark.implicits._

import spark.sql

                //生成資料表表結構

val schema = StructType(mutable.ArraySeq(
StructField("classname", StringType, true),
StructField("name", StringType, true),

StructField("grade", IntegerType, true)))

               //將表結構和表資料組合生成表

val tablerow = spark.createDataFrame(rowrdd, schema)

               //將生成的df轉換為一個表並且命名

tablerow.createTempView("testtable")
val tetrow = sql("select * from testtable")

//tetrow.show()

               //運用Spark sql 開窗函式進行計算

               PARTITION BY 為需要開窗欄位

               ORDER BY 為需要排序欄位

val resultrow = sql("""
      select a.classname,a.name,a.grade from (select classname,name,grade,row_number() OVER (PARTITION BY classname ORDER BY grade DESC) rank from testtable) as a where a.rank <= 3
      """)

resultrow.show()

輸出結果:


本文結束,希望能幫到大家,也希望大家批評指正!