pyspark中的自定義函式
阿新 • • 發佈:2018-12-12
由於目前的pyspark不支援dataset,因此只能用dataframe+udf或者rdd的方式來對列進行資料處理。這裡介紹一下udf,udf註冊有兩種方法,一種是呼叫register方法,一種是呼叫udf函式。兩者都能在withColumn和apply中使用。兩種方法的區別是:udf註冊後可以使用dataframe的api,而register註冊後可以使用spark sql。
1. Spark DF 和 Pandas DF
首先要區分spark dataframe和pandas dataframe。pandas df會將所有資料儲存在driver node上,一定要慎用。
spark df 與 pandas df 相互轉化效能優化,需要開啟配置,預設為關閉。
spark.sql.execution.arrow.enabled true
import numpy as np
import pandas as pd
//初始化 pandas DF
pdf = pd.DataFrame(np.random.rand(100000, 3))
// pdf -> sdf
%time df = spark.createDataFrame(pdf)
// sdf -> pdf
%time result_pdf = df.select("*").toPandas()
2. Spark UDF
UDF是使用者自定義函式的縮寫。下面展示一下幾種UDF使用方法:
from pyspark.sql.functions import udf df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v")) # 使用register註冊的方法 def f1(v): return v+1 f1_udf = spark.udf.register('f1',f1) df.withColumn('v2',f1_udf(df.v)).show() # 使用udf回撥的方法 @udf('double') #這裡也可以簡單的使用@udf def f2(v): return v+1 df.withColumn('v2',f2(df.v)).show() # 使用udf函式註冊 def f3(v): return v+1 f3_udf = udf(f3,'double') df.withColumn('v2',f3_udf(df.v)).show() # 使用register註冊的可以使用spark sql df.createOrReplaceTempView("df") spark.sql('select v,f1(v) v2 from df').show() # 使用udf註冊的可以使用dataframe api df.select(['v',f2('v').alias('v2')]).show()
3. Pandas UDF
Pandas UDF構建在 Apache Arrow 之上。Apache Arrow是一個跨平臺的在記憶體中以列式儲存的資料層,減少了大量java和python之間序列化和反序列化的工作。
from pyspark.sql.functions import pandas_udf, PandasUDFType df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v")) #使用回撥函式@pandas_udf(schema,return type)來註冊 @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def normalize(pdf): v = pdf.v return pdf.assign(v=(v - v.mean()) / v.std()) df.groupby("id").apply(normalize).show() #使用pandas_udf函式直接註冊 def plus_one(a): return a + 1 plus_one_pd_udf = pandas_udf(plus_one, returnType=LongType())