Spark 2.4.0程式設計指南--Spark SQL UDF和UDAF
阿新 • • 發佈:2018-12-26
Spark 2.4.0程式設計指南–Spark SQL UDF和UDAF
更多資源
視訊
- Spark 2.4.0程式設計指南–Spark SQL UDF和UDAF(bilibili視訊) : https://www.bilibili.com/video/av38193405/?p=4
文件
前置條件
- 已安裝好java(選用的是java 1.8.0_191)
- 已安裝好scala(選用的是scala 2.11.121)
- 已安裝好hadoop(選用的是Hadoop 3.1.1)
- 已安裝好spark(選用的是spark 2.4.0)
技能標籤
- 瞭解UDF 使用者定義函式(User-defined functions, UDFs)
- 瞭解UDAF (user-defined aggregate function), 使用者定義的聚合函式
- UDF示例(統計行資料字元長度)
- UDF示例(統計行資料字元轉大寫)
- UDAF示例(統計總行數)
- UDAF示例(統計最大收入)
- UDAF示例(統計平均收入)
- UDAF示例(統計按性別分組的最大收入)
- 官網: http://spark.apache.org/docs/2.4.0/sql-getting-started.html#aggregations
UDF
使用者定義函式(User-defined functions, UDFs)是大多數 SQL 環境的關鍵特性,用於擴充套件系統的內建功能。 UDF允許開發人員通過抽象其低階語言實現來在更高階語言(如SQL)中啟用新功能。 Apache Spark 也不例外,並且提供了用於將 UDF 與 Spark SQL工作流整合的各種選項。
- 使用者定義函式(User-defined functions, UDFs)
- UDF對錶中的單行進行轉換,以便為每行生成單個對應的輸出值
##示例
- 得到SparkSession
BaseSparkSession
/**
* 得到SparkSession
* 首先 extends BaseSparkSession
* 本地: val spark = sparkSession(true)
* 叢集: val spark = sparkSession()
*/
class BaseSparkSession {
var appName = "sparkSession"
var master = "spark://standalone.com:7077" //本地模式:local standalone:spark://master:7077
def sparkSession(): SparkSession = {
val spark = SparkSession.builder
.master(master)
.appName(appName)
.config("spark.eventLog.enabled","true")
.config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
.config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog")
.getOrCreate()
spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
//import spark.implicits._
spark
}
def sparkSession(isLocal:Boolean = false): SparkSession = {
if(isLocal){
master = "local"
val spark = SparkSession.builder
.master(master)
.appName(appName)
.getOrCreate()
//spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
//import spark.implicits._
spark
}else{
val spark = SparkSession.builder
.master(master)
.appName(appName)
.config("spark.eventLog.enabled","true")
.config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
.config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog")
.getOrCreate()
// spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
//import spark.implicits._
spark
}
}
/**
* 得到當前工程的路徑
* @return
*/
def getProjectPath:String=System.getProperty("user.dir")
}
UDF (統計欄位長度)
- 對資料集中,每行資料的特定欄位,計算字元長度
- 通過 spark.sql 直接在欄位查詢處呼叫函式名稱
/**
* 自定義匿名函式
* 功能: 得到某列資料長度的函式
*/
object Run extends BaseSparkSession{
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
spark.udf.register("strLength",(str: String) => str.length())
ds.createOrReplaceTempView("employees")
spark.sql("select name,salary,strLength(name) as name_Length from employees").show()
// +-------+------+-----------+
// | name|salary|name_Length|
// +-------+------+-----------+
// |Michael| 3000| 7|
// | Andy| 4500| 4|
// | Justin| 3500| 6|
// | Berta| 4000| 5|
// +-------+------+-----------+
spark.stop()
}
}
UDF (欄位轉成大寫)
- 對資料集中,每行資料的特定欄位,計算字元長度
- 通過 dataSet.withColumn 呼叫column
- Column通過udf函式轉換
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
/**
* 自定義匿名函式
* 功能: 得到某列資料長度的函式
*/
object Run extends BaseSparkSession{
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
import org.apache.spark.sql.functions._
val strUpper = udf((str: String) => str.toUpperCase())
import spark.implicits._
ds.withColumn("toUpperCase", strUpper($"name")).show
// +-------+------+-----------+
// | name|salary|toUpperCase|
// +-------+------+-----------+
// |Michael| 3000| MICHAEL|
// | Andy| 4500| ANDY|
// | Justin| 3500| JUSTIN|
// | Berta| 4000| BERTA|
// +-------+------+-----------+
spark.stop()
}
}
UDAF
- UDAF(user-defined aggregate function, 使用者定義的聚合函式
- 同時處理多行,並且返回一個結果,通常結合使用 GROUP BY 語句(例如 COUNT 或 SUM)
count
- 統計一共有多少行資料
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_01_spark_udaf_count
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
* ).initialize()方法,初使使,即沒資料時的值
* ).update() 方法把每一行的資料進行計算,放到緩衝物件中
* ).merge() 把每個分割槽,緩衝物件進行合併
* ).evaluate()計算結果表示式,把緩衝物件中的資料進行最終計算
*/
object Run2 extends BaseSparkSession{
object CustomerCount extends UserDefinedAggregateFunction{
//聚合函式的輸入引數資料型別
def inputSchema: StructType = {
StructType(StructField("inputColumn",StringType) :: Nil)
}
//中間快取的資料型別
def bufferSchema: StructType = {
StructType(StructField("sum",LongType) :: Nil)
}
//最終輸出結果的資料型別
def dataType: DataType = LongType
def deterministic: Boolean = true
//初始值,要是DataSet沒有資料,就返回該值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
}
/**
*
* @param buffer 相當於把當前分割槽的,每行資料都需要進行計算,計算的結果儲存到buffer中
* @param input
*/
def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)){
buffer(0) = buffer.getLong(0) + 1
}
}
/**
* 相當於把每個分割槽的資料進行彙總
* @param buffer1 分割槽一的資料
* @param buffer2 分割槽二的資料
*/
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary
}
//計算最終的結果
def evaluate(buffer: Row): Long = buffer.getLong(0)
}
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
spark.udf.register("customerCount",CustomerCount)
val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
df.createOrReplaceTempView("employees")
val sqlDF = spark.sql("select customerCount(name) as average_salary from employees ")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
sqlDF.show()
// +--------------+
// |average_salary|
// +--------------+
// | 4.0|
// +--------------+
spark.stop()
}
}
max
- 統計收入最高的
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_03_spark_udaf_sum
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
* ).initialize()方法,初使使,即沒資料時的值
* ).update() 方法把每一行的資料進行計算,放到緩衝物件中
* ).merge() 把每個分割槽,緩衝物件進行合併
* ).evaluate()計算結果表示式,把緩衝物件中的資料進行最終計算
*/
object Run extends BaseSparkSession{
object CustomerSum extends UserDefinedAggregateFunction{
//聚合函式的輸入引數資料型別
def inputSchema: StructType = {
StructType(StructField("inputColumn",LongType) :: Nil)
}
//中間快取的資料型別
def bufferSchema: StructType = {
StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
}
//最終輸出結果的資料型別
def dataType: DataType = LongType
def deterministic: Boolean = true
//初始值,要是DataSet沒有資料,就返回該值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
}
/**
*
* @param buffer 相當於把當前分割槽的,每行資料都需要進行計算,計算的結果儲存到buffer中
* @param input
*/
def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)){
buffer(0) = buffer.getLong(0) + input.getLong(0)
}
}
/**
* 相當於把每個分割槽的資料進行彙總
* @param buffer1 分割槽一的資料
* @param buffer2 分割槽二的資料
*/
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
}
//計算最終的結果
def evaluate(buffer: Row): Long = buffer.getLong(0)
}
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
spark.udf.register("customerSum",CustomerSum)
val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
df.createOrReplaceTempView("employees")
val sqlDF = spark.sql("select customerSum(salary) as average_salary from employees ")
df.show
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
sqlDF.show()
// +--------------+
// |average_salary|
// +--------------+
// | 15000|
// +--------------+
spark.stop()
}
}
average
- 統計平均收入水平
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_04_spark_udaf_average
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
object Run extends BaseSparkSession{
object MyAverage extends UserDefinedAggregateFunction{
//聚合函式的輸入引數資料型別
def inputSchema: StructType = {
StructType(StructField("inputColumn",LongType) :: Nil)
}
//中間快取的資料型別
def bufferSchema: StructType = {
StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
}
//最終輸出結果的資料型別
def dataType: DataType = DoubleType
def deterministic: Boolean = true
//初始值,要是DataSet沒有資料,就返回該值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
/**
*
* @param buffer 相當於把當前分割槽的,每行資料都需要進行計算,計算的結果儲存到buffer中
* @param input
*/
def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)){
buffer(0) = buffer.getLong(0) + input.getLong(0) // salary
buffer(1) = buffer.getLong(1) + 1 // count
}
}
/**
* 相當於把每個分割槽的資料進行彙總
* @param buffer1 分割槽一的資料
* @param buffer2 分割槽二的資料
*/
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary
buffer1(1) = buffer1.getLong(1) +buffer2.getLong(1) // count
}
//計算最終的結果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
spark.udf.register("MyAverage",MyAverage)
val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json")
df.createOrReplaceTempView("employees")
val sqlDF = spark.sql("select MyAverage(salary) as average_salary from employees ")
sqlDF.show()
spark.stop()
}
}
group by max
- 按性別分組統計收入最高是多少
- 即統計男,女,各收入最高是多少
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_05_spark_udaf_groupby_max
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/**
* ).initialize()方法,初使使,即沒資料時的值
* ).update() 方法把每一行的資料進行計算,放到緩衝物件中
* ).merge() 把每個分割槽,緩衝物件進行合併
* ).evaluate()計算結果表示式,把緩衝物件中的資料進行最終計算
*/
object Run extends BaseSparkSession{
object CustomerMax extends UserDefinedAggregateFunction{
//聚合函式的輸入引數資料型別
def inputSchema: StructType = {
StructType(StructField("inputColumn",LongType) :: Nil)
}
//中間快取的資料型別
def bufferSchema: StructType = {
StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
}
//最終輸出結果的資料型別
def dataType: DataType = LongType
def deterministic: Boolean = true
//初始值,要是DataSet沒有資料,就返回該值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
}
/**
*
* @param buffer 相當於把當前分割槽的,每行資料都需要進行計算,計算的結果儲存到buffer中
* @param input
*/
def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)){
if(input.getLong(0) > buffer.getLong(0)){
buffer(0) = input.getLong(0)
}
}
}
/**
* 相當於把每個分割槽的資料進行彙總
* @param buffer1 分割槽一的資料
* @param buffer2 分割槽二的資料
*/
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={
if( buffer2.getLong(0) > buffer1.getLong(0)) buffer1(0) = buffer2.getLong(0)
}
//計算最終的結果
def evaluate(buffer: Row): Long = buffer.getLong(0)
}
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
spark.udf.register("customerMax",CustomerMax)
val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employeesCN.json")
df.createOrReplaceTempView("employees")
val sqlDF = spark.sql("select gender,customerMax(salary) as average_salary from employees group by gender ")
df.show
// +------+----+------+
// |gender|name|salary|
// +------+----+------+
// | 男|小王| 30000|
// | 女|小麗| 50000|
// | 男|小軍| 80000|
// | 女|小李| 90000|
// +------+----+------+
sqlDF.show()
// +------+--------------+
// |gender|average_salary|
// +------+--------------+
// | 男| 80000|
// | 女| 90000|
// +------+--------------+
spark.stop()
}
}
其它支援
- Spark SQL 支援整合現有 Hive 中的 UDF ,UDAF 和 UDTF 的(Java或Scala)實現。
- UDTFs(user-defined table functions, 使用者定義的表函式)可以返回多列和多行
end