1. 程式人生 > >在spark dataFrame 中使用 pandas dataframe

在spark dataFrame 中使用 pandas dataframe

文章目錄

背景

pandas spark
工作方式 單機,無法處理大量資料 分散式,能處理大量資料
儲存方式 單機快取 可以呼叫 persist/cache 分散式快取
是否可變
index索引 自動建立 無索引
行結構 Pandas.Series Pyspark.sql.Row
列結構 Pandas.Series Pyspark.sql.Column
允許列重名

pandas dataFrame 無法支援大量資料的計算,可以嘗試 spark df 來解決這個問題。

xgboost 預測

優化前

import xgboost as xgb
import pandas as pd
import numpy as np

# 載入模型
bst = xgb.Booster()
bst.load_model("xxx.model")

# 變數列表
var_list=[...]
df.rdd.map(lambda x : cal_xgb_score(x,var_list,ntree_limit=304)).write.toDF()

# 計算分數
def cal_xgb_score(x,var_list,ntree_limit=50):
    feature_count =
len(var_list) x1 = pd.DataFrame(np.array(x).reshape(1,feature_count),columns=var_list) # 資料變化操作 y1 = transformFun(x1) test_x = xgb.DMatrix(y1.drop(['mobile','mobile_md5'],xais=1),missing=float('nan')) y1['score'] = bst.predict(test_x,ntree_limit=ntree_limit) y2 = y1[['mobile','mobile_md5','score']] return {'mobile': str(y2['mobile'][0]),'mobille_md5':str(y2['mobile_md5'][0]),'score':float(y2['score'][0])}

每條資料都轉化為 pd,增加了額外開銷。

優化後

def cal_xgb_score(x,var_list,ntree_limit=50):
    feature_count = len(var_list)
    //將 iterator 轉為list 
    x1 = pd.DataFrame(list(x),columns=var_list)
    ...
    //將 pdf 轉為字典
    return y1[['mobile','mobile_md5','score']].to_dict(orient='record')

toPandas

優化前

df.toPandas()

優化後

import pandas as pd
def _map_to_pandas(rdds):
    return [pd.DataFrame(list(rdds))]
    
def toPandas(df, n_partitions=None):
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

# 98列,22W行,型別 array/string/Long/Int,分割槽 200
df = spark.sql("...").sample(False,0.002)

df.cache()
df.count()

# 原生的 toPandas 方法
%timeit df.toPandas()

# 分散式的 toPandas
%timeit toPandas(df)

#使用 apache arrow,spark 版本2.3以上
spark.sql("set spark.sql.execution.arrow.enabled=true")
%timeit df.toPandas()

效果

xgboost 預測

單條資料處理速度從 120 record / min 提高到 3278 record / min

tips: 如果一個分割槽資料量過大將會導致 executor oom

spark dataframe 轉 pandas dataframe

type cost (seconds)
native toPandas 12
distributed toPandas 5.91
arrow toPandas 2.52

toPandas 返回的資料歸根結底還是快取在 driver 的記憶體中的,不建議返回過大的資料。