1. 程式人生 > >Spark Streaming--開窗函式over()

Spark Streaming--開窗函式over()

over()開窗函式

  • 在使用聚合函式後,會將多行變成一行,而開窗函式是將一行變成多行;

  • 並且在使用聚合函式後,如果要顯示其他的列必須將列加入到group by中,而使用開窗函式後,可以不使用group by,直接將所有資訊顯示出來。

  • 開窗函式適用於在每一行的最後一列新增聚合函式的結果。

開窗函式作用

  • 為每條資料顯示聚合資訊.(聚合函式() over())

  • 為每條資料提供分組的聚合函式結果(聚合函式() over(partition by 欄位) as 別名) 
    --按照欄位分組,分組後進行計算

  • 與排名函式一起使用(row number() over(order by 欄位) as 別名)

常用分析函式:(最常用的應該是1.2.3 的排序)

  • row_number() over(partition by ... order by ...)

  • rank() over(partition by ... order by ...)

  • dense_rank() over(partition by ... order by ...)

  • count() over(partition by ... order by ...)

  • max() over(partition by ... order by ...)

  • min() over(partition by ... order by ...)

  • sum() over(partition by ... order by ...)

  • avg() over(partition by ... order by ...)

  • first_value() over(partition by ... order by ...)

  • last_value() over(partition by ... order by ...)

  • lag() over(partition by ... order by ...)

  • lead() over(partition by ... order by ...) 
    lag 和lead 可以獲取結果集中,按一定排序所排列的當前行的上下相鄰若干offset 的某個行的某個列(不用結果集的自關聯); 
    lag ,lead 分別是向前,向後; 
    lag 和lead 有三個引數,第一個引數是列名,第二個引數是偏移的offset,第三個引數是超出記錄視窗時的預設值

簡單的開窗函式範例:

package com.jiangnan.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * 姓名  班級  分數
  * 101 1 90
  * 120 1 88
  */
case class Score(name:String,clazz:Int,score: Int)
object OpenFunction extends App {
  val conf = new SparkConf().setAppName("").setMaster("local[2]")
  val spark = SparkSession.builder().config(conf).getOrCreate()

  import spark.implicits._
  //生產資料
  val score = spark.sparkContext.makeRDD(List(
    Score("1001",1,90),
    Score("1002",2,95),
    Score("1003",3,90),
    Score("1004",1,92),
    Score("1005",1,88),
    Score("1005",2,66),
    Score("1005",3,90),
    Score("1005",1,93),
    Score("1005",3,99),
    Score("1005",2,90),
    Score("1005",2,92),
    Score("1005",3,90)
  )).toDF("name","clazz","score")
  println("---------原始資料----------")
  score.show()
  println("---------求每個班級最高成績的學生---原始做法------")
  //建立一個表
  score.createOrReplaceTempView("score")
  println("-----------分組後求出每個班最高分數表結構----------")
  spark.sql("select clazz,max(score) max from score group by clazz").show()
  println("-----------原始做法,最終結果--------------")
  spark.sql("select a.name,b.clazz,b.max from score a,(select clazz,max(score) max from score group by clazz) b where a.score = b.max").show()
  println("------------使用開窗函式後的運算過程-------------")
  spark.sql("select name,clazz,score,rank() over(partition by clazz order by score desc) rank from score").show()

  println("------------使用開窗函式後最終結果-------------")
  spark.sql("select * from (select name,clazz,score,rank() over(partition by clazz order by score desc) rank from score) r where r.rank = 1").show()
  spark.stop()