1. 程式人生 > 實用技巧 >spark 之 UDF的兩種方式

spark 之 UDF的兩種方式

詳見:https://www.cnblogs.com/itboys/p/9347403.html

1)如果使用spark.sql("")

=> 內部呼叫hive處理,只能使用spark.udf.register("",)

例如:

import org.apache.spark.sql.functions._
val maxandmin = udf{
  (cdata:Double,maxdata:Double,mindata:Double)=>{
    (cdata-mindata)/(maxdata-mindata)
  }
}
spark.udf.register("maxandmin",maxandmin)

def getUserbaseinfo(spark:SparkSession)
={ val sql = s"""select |userid,locale,gender, |location, |maxandmin(cage,max_age,min_age) age, |maxandmin(timezone,max_timezone,min_timezone) timezone, |maxandmin(members,max_members,min_members) members |from |
(select userid, |case when l.locale is null then 0 else l.localeid end locale, |gender,location, |calcage(birthyear) cage,min_age,max_age, |timezone,min_timezone,max_timezone, |members,min_members,max_members |from dwd_events.dwd_users u
|left join dwd_events.dwd_locale l |on lower(u.locale)=lower(l.locale) |cross join (select min(calcage(birthyear)) min_age |,max(calcage(birthyear)) max_age,min(timezone) min_timezone, |max(timezone) max_timezone, min(members) min_members,max(members) max_members |from dwd_events.dwd_users) b ) c""".stripMargin spark.sql(sql) }

2)如果使用DataFrame API

=> 僅使用udf()就行