pyspark系列--自定義函式
阿新 • • 發佈:2019-01-10
自定義函式
1. 概覽
自定義函式的重點在於定義返回值型別的資料格式,其資料型別基本都是從from pyspark.sql.types import *
匯入,常用的包括:
- StructType():結構體
- StructField():結構體中的元素
- LongType():長整型
- StringType():字串
- IntegerType():一般整型
- FloatType():浮點型
還記得我們在前面的建立spark.dataframe
提到的例子嗎,dataframe的資料結構定義如下:
from pyspark.sql.types import StructType, StructField, LongType, StringType
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
2. 自定義函式的一般流程
# 1.建立普通的python函式
def toDate(s):
return str(s)+'-'
# 2.註冊自定義函式
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# 根據python的返回值型別定義好spark對應的資料型別
# python函式中返回的是string,對應的pyspark是StringType
toDateUDF=udf(toDate, StringType())
# 使用自定義函式
df1.withColumn('color',toDateUDF('color')).show()
3. 簡單的自定義函式
最簡單的就是通過lambda函式,不需要定義返回值型別,可以直接使用
# 建立udf自定義函式
from pyspark.sql import functions
concat_func = functions.udf(lambda name,age:name+'_'+str(age)) # 簡單的連線兩個字串
# 應用自定義函式
concat_df = spark_df.withColumn("name_age",concat_func(final_data.name, final_data.age))
concat_df.show()
4. 自定義函式進階
對於簡單的自定義函式,用上面的方法是足夠了,但是很多時候,我們面對的問題比較複雜,需要對返回值的格式要求比較特別,就需要定義複雜的返回值型別了。