SparkSQL中UDF和UDAF
阿新 • • 發佈:2019-01-27
UDF: User Defined Function,使用者自定義的函式,函式的輸入是一條具體的資料記錄,實現上講就是普通的Scala函式;
UDAF:User Defined Aggregation Function,使用者自定義的聚合函式,函式本身作用於資料集合,能夠在聚合操作的基礎上進行自定義操作;
8、按照模板實現UDAF
UDAF:User Defined Aggregation Function,使用者自定義的聚合函式,函式本身作用於資料集合,能夠在聚合操作的基礎上進行自定義操作;
實質上講,例如說UDF會被Spark SQL中的Catalyst封裝成為Expression,最終會通過eval方法來計算輸入的資料Row(此處的Row和DataFrame中的Row沒有任何關係)
不說太多直接上程式碼
1、建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊
val conf = new SparkConf() //建立SparkConf物件 conf.setAppName("SparkSQLUDFUDAF") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱 //conf.setMaster("spark://DaShuJu-040:7077") //此時,程式在Spark叢集 conf.setMaster("local[4]")
2、建立SparkContext物件和SQLContext物件
//建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) //構建SQL上下文
3、模擬實際使用的資料
<span style="font-family: Arial, Helvetica, sans-serif;">val bigData = Array("Spark", "Spark", "Hadoop", "Spark", "Hadoop", "Spark", "Spark", "Hadoop", "Spark", "Hadoop")</span>
4、基於提供的資料建立DataFrame
val bigDataRDD = sc.parallelize(bigData)
val bigDataRDDRow = bigDataRDD.map(item => Row(item))
val structType = StructType(Array(StructField("word", StringType, true)))
val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow,structType)
5、註冊成為臨時表
bigDataDF.registerTempTable("bigDataTable")
6、通過SQLContext註冊UDF,在Scala 2.10.x版本UDF函式最多可以接受22個輸入引數
sqlContext.udf.register("computeLength", (input: String) => input.length)
//直接在SQL語句中使用UDF,就像使用SQL自動的內部函式一樣
sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show
7、通過SQLContext註冊UDAF
sqlContext.udf.register("wordCount", new MyUDAF)
sqlContext.sql("select word,wordCount(word) as count,computeLength(word) as length" +
" from bigDataTable group by word").show()
8、按照模板實現UDAF
class MyUDAF extends UserDefinedAggregateFunction {
// 該方法指定具體輸入資料的型別
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
//在進行聚合操作的時候所要處理的資料的結果的型別
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
//指定UDAF函式計算後返回的結果型別
override def dataType: DataType = IntegerType
// 確保一致性 一般用true
override def deterministic: Boolean = true
//在Aggregate之前每組資料的初始化結果
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) =0}
// 在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算
// 本地的聚合操作,相當於Hadoop MapReduce模型中的Combiner
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + 1
}
//最後在分散式節點進行Local Reduce完成後需要進行全域性級別的Merge操作
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}
//返回UDAF最後的計算結果
override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}