1. 程式人生 > >SparkSQL中UDF和UDAF

SparkSQL中UDF和UDAF

UDF: User Defined Function,使用者自定義的函式,函式的輸入是一條具體的資料記錄,實現上講就是普通的Scala函式;
UDAF:User Defined Aggregation Function,使用者自定義的聚合函式,函式本身作用於資料集合,能夠在聚合操作的基礎上進行自定義操作;

實質上講,例如說UDF會被Spark SQL中的Catalyst封裝成為Expression,最終會通過eval方法來計算輸入的資料Row(此處的Row和DataFrame中的Row沒有任何關係)

不說太多直接上程式碼

1、建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊

val conf = new SparkConf() //建立SparkConf物件
conf.setAppName("SparkSQLUDFUDAF") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱
//conf.setMaster("spark://DaShuJu-040:7077") //此時,程式在Spark叢集
conf.setMaster("local[4]")

2、建立SparkContext物件和SQLContext物件

//建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) //構建SQL上下文

3、模擬實際使用的資料

<span style="font-family: Arial, Helvetica, sans-serif;">val bigData = Array("Spark", "Spark", "Hadoop", "Spark", "Hadoop", "Spark", "Spark", "Hadoop", "Spark", "Hadoop")</span>

4、基於提供的資料建立DataFrame

val bigDataRDD =  sc.parallelize(bigData)
val bigDataRDDRow = bigDataRDD.map(item => Row(item))
val structType = StructType(Array(StructField("word", StringType, true)))
val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow,structType)

5、註冊成為臨時表

bigDataDF.registerTempTable("bigDataTable")

6、通過SQLContext註冊UDF,在Scala 2.10.x版本UDF函式最多可以接受22個輸入引數

sqlContext.udf.register("computeLength", (input: String) => input.length)
//直接在SQL語句中使用UDF,就像使用SQL自動的內部函式一樣
sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show


7、通過SQLContext註冊UDAF

sqlContext.udf.register("wordCount", new MyUDAF)
sqlContext.sql("select word,wordCount(word) as count,computeLength(word) as length" +
" from bigDataTable group by word").show()

8、按照模板實現UDAF
class  MyUDAF extends UserDefinedAggregateFunction {
  // 該方法指定具體輸入資料的型別
  override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
  //在進行聚合操作的時候所要處理的資料的結果的型別
  override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
  //指定UDAF函式計算後返回的結果型別
  override def dataType: DataType = IntegerType
  // 確保一致性 一般用true
  override def deterministic: Boolean = true
  //在Aggregate之前每組資料的初始化結果
  override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) =0}
  // 在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算
  // 本地的聚合操作,相當於Hadoop MapReduce模型中的Combiner
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0) + 1
  }
  //最後在分散式節點進行Local Reduce完成後需要進行全域性級別的Merge操作
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  }
  //返回UDAF最後的計算結果
  override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}