Spark之UDF
阿新 • • 發佈:2018-11-25
gist spl name parallel reg dataframe rdd build etl
1 package big.data.analyse.udfudaf 2 3 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 4 import org.apache.spark.sql.{Row, SparkSession} 5 6 /** 7 * Created by zhen on 2018/11/25. 8 */ 9 object SparkUdfUdaf { 10 def isAdult(age : Int) ={ 11 if(age > 18){12 true 13 }else{ 14 false 15 } 16 } 17 def main(args: Array[String]) { 18 val spark = SparkSession 19 .builder() 20 .appName("UdfUdaf") 21 .master("local[2]") 22 .getOrCreate() 23 val userData = Array( 24 "2015,11,www.baidu.com", 25 "2016,14,www.google.com",26 "2017,13,www.apache.com", 27 "2015,21,www.spark.com", 28 "2016,32,www.hadoop.com", 29 "2017,18,www.solr.com", 30 "2017,14,www.hive.com" 31 ) 32 val sc = spark.sparkContext 33 val sqlContext = spark.sqlContext 34 val userDataRDD = sc.parallelize(userData) // 轉化為RDD35 val userDataType = userDataRDD.map(line => { 36 val Array(age, id, url) = line.split(",") 37 Row( 38 age, id.toInt, url 39 ) 40 }) 41 val structTypes = StructType(Array( 42 StructField("age", StringType, true), 43 StructField("id", IntegerType, true), 44 StructField("url", StringType, true) 45 )) 46 // RDD轉化為DataFrame 47 val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes) 48 // 註冊臨時表 49 userDataFrame.createOrReplaceTempView("udf") 50 // 註冊udf(方式一) 51 spark.udf.register("getLength", (str : String) => str.length) 52 // 註冊udf(方式二) 53 spark.udf.register("isAdult", isAdult _) 54 //執行sql 55 val sql = "select * from udf where getLength(udf.url)=13 and isAdult(udf.id)" 56 val result = sqlContext.sql(sql) 57 result.foreach(println(_)) 58 } 59 }
結果:
Spark之UDF