1. 程式人生 > >pyspark系列--自定義函式

pyspark系列--自定義函式

自定義函式

1. 概覽

自定義函式的重點在於定義返回值型別的資料格式,其資料型別基本都是從from pyspark.sql.types import * 匯入,常用的包括:
- StructType():結構體
- StructField():結構體中的元素
- LongType():長整型
- StringType():字串
- IntegerType():一般整型
- FloatType():浮點型

還記得我們在前面的建立spark.dataframe提到的例子嗎,dataframe的資料結構定義如下:

from pyspark.sql.types import
StructType, StructField, LongType, StringType schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True) ])

2. 自定義函式的一般流程

# 1.建立普通的python函式
def toDate(s):
return str(s)+'-' # 2.註冊自定義函式 from pyspark.sql.functions import udf from pyspark.sql.types import StringType # 根據python的返回值型別定義好spark對應的資料型別 # python函式中返回的是string,對應的pyspark是StringType toDateUDF=udf(toDate, StringType()) # 使用自定義函式 df1.withColumn('color',toDateUDF('color')).show()

3. 簡單的自定義函式

最簡單的就是通過lambda函式,不需要定義返回值型別,可以直接使用

# 建立udf自定義函式
from pyspark.sql import functions
concat_func = functions.udf(lambda name,age:name+'_'+str(age))  # 簡單的連線兩個字串

# 應用自定義函式
concat_df = spark_df.withColumn("name_age",concat_func(final_data.name, final_data.age))
concat_df.show()

4. 自定義函式進階

對於簡單的自定義函式,用上面的方法是足夠了,但是很多時候,我們面對的問題比較複雜,需要對返回值的格式要求比較特別,就需要定義複雜的返回值型別了。