1. 程式人生 > >python實現Spark(Hive) SQL中UDF的使用

python實現Spark(Hive) SQL中UDF的使用

相對於使用MapReduce或者Spark Application的方式進行資料分析,使用Hive SQL或Spark SQL能為我們省去不少的程式碼工作量,而Hive SQL或Spark SQL本身內建的各類UDF也為我們的資料處理提供了不少便利的工具,當這些內建的UDF不能滿足於我們的需要時,Hive SQL或Spark SQL還為我們提供了自定義UDF的相關介面,方便我們根據自己的需求進行擴充套件。在Hive的世界裡使用自定義UDF的過程是比較複雜的。我們需要根據需求使用Java語言開發相應的UDF(UDAF、UDTF),然後將UDF的程式碼及其依賴編譯打包為Jar,使用方法有兩種:(1)臨時函式在一次會話(Session)中使用如下語句建立臨時函式:ADD JAR /run/jar/udf_test.jar;CREATE TEMPORARY FUNCTION my_add AS 'com.hive.udf.Add';這種方式有一個缺點:每一次會話過程中使用函式時都需要建立,而且僅在當前會話中有效。(2)永久函式這個特性需要高版本的Hive支援,它的好處是可以將UDF Jar存放至HDFS,函式僅需要建立一次即可以永久使用,如下:CREATE FUNCTION func.ipToLocationBySina AS 'com.sina.dip.hive.function.IPToLocationBySina' USING JAR 'hdfs://dip.cdh5.dev:8020/user/hdfs/func/location.jar';雖然永久函式相對於臨時函式有一定優勢,但Java語言的開發門檻很大程度上妨礙了UDF在實際資料分析過程中使用,畢竟我們的資料分析師多數是以Python、SQL為主要分析工具的,每一次UDF的開發都需要工程師的參與,開發效率與應用效果都是不是很好(可能需要頻繁更新UDF的問題),PySpark的出現確很好地解決了這個問題:它可以非常方便地將一個普通的Python函式註冊為一個UDF。為了說明如何在Spark(Hive) SQL中的使用Python UDF,我們首先模擬一張資料表,為了簡單起見,該表僅有一行一列資料:我們模擬了一張資料表temp_table,該表僅有一列,其中列名稱為col,列型別為字串且不允許包含Null,輸出結果:我們在表temp_table的基礎之上演示UDF的使用方法:首先我們定義一個普通的Python函式:func_string,為了簡單起見它沒有任何引數,僅僅返回一個簡單的字串;然後我們通過HiveContext registerFunction即可以將函式func_string註冊為UDF,registerFunction接收兩個引數:UDF名稱、UDF關聯的Python函式;最後我們可以在Spark(Hive) SQL中使用這個UDF,輸出結果:我們需要注意的是,HiveContext registerFunction實際上有三個引數:
name:UDF名稱;f:UDF關聯的Python函式;returnType:UDF(Python函式)返回值型別,預設為StringType()。上述示例中因為我們的UDF函式的返回值型別為字串,因此使用Hive registerFunction註冊UDF時省略了引數returnType,即returnType預設值為StringType(),如果UDF(Python函式)的返回值型別不為字串,則需要顯式為其指定returnType。我們以型別IntegerType、ArrayType、StructType、MapType為例演示需要顯式指定returnType的情況。(1)IntegerType
(2)ArrayType注意:ArrayType(陣列)必須確保元素型別的一致性,如指定UDF返回值型別為ArrayType(IntegerType()),則函式func_array的返回值型別必須為list或tuple,其中的元素型別必須為int。(3)StructType注意:StructType必須確保函式的返回值型別為tuple,而且使用HiveContext registerFunction註冊UDF時需要依次為其中的元素指定名稱各型別,如上述示例中每一個元素的名稱為first,型別為IntegerType;第二個元素的名稱為second,型別為FloatType;第三個元素的名稱為third,型別為StringType。(4)MapType
注意:MapType必須確保函式的返回值型別為dict,而且所有的“key”應保持型別一致,“value”也就保持型別一致。