1. 程式人生 > >Spark之UDF

Spark之UDF

gist spl name parallel reg dataframe rdd build etl

 1 package big.data.analyse.udfudaf
 2 
 3 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 4 import org.apache.spark.sql.{Row, SparkSession}
 5 
 6 /**
 7   * Created by zhen on 2018/11/25.
 8   */
 9 object SparkUdfUdaf {
10   def isAdult(age : Int) ={
11     if(age > 18){
12 true 13 }else{ 14 false 15 } 16 } 17 def main(args: Array[String]) { 18 val spark = SparkSession 19 .builder() 20 .appName("UdfUdaf") 21 .master("local[2]") 22 .getOrCreate() 23 val userData = Array( 24 "2015,11,www.baidu.com", 25 "2016,14,www.google.com",
26 "2017,13,www.apache.com", 27 "2015,21,www.spark.com", 28 "2016,32,www.hadoop.com", 29 "2017,18,www.solr.com", 30 "2017,14,www.hive.com" 31 ) 32 val sc = spark.sparkContext 33 val sqlContext = spark.sqlContext 34 val userDataRDD = sc.parallelize(userData) // 轉化為RDD
35 val userDataType = userDataRDD.map(line => { 36 val Array(age, id, url) = line.split(",") 37 Row( 38 age, id.toInt, url 39 ) 40 }) 41 val structTypes = StructType(Array( 42 StructField("age", StringType, true), 43 StructField("id", IntegerType, true), 44 StructField("url", StringType, true) 45 )) 46 // RDD轉化為DataFrame 47 val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes) 48 // 註冊臨時表 49 userDataFrame.createOrReplaceTempView("udf") 50 // 註冊udf(方式一) 51 spark.udf.register("getLength", (str : String) => str.length) 52 // 註冊udf(方式二) 53 spark.udf.register("isAdult", isAdult _) 54 //執行sql 55 val sql = "select * from udf where getLength(udf.url)=13 and isAdult(udf.id)" 56 val result = sqlContext.sql(sql) 57 result.foreach(println(_)) 58 } 59 }

結果:

技術分享圖片

Spark之UDF