在spark dataFrame 中使用 pandas dataframe
阿新 • • 發佈:2018-12-13
文章目錄
背景
項 | pandas | spark |
---|---|---|
工作方式 | 單機,無法處理大量資料 | 分散式,能處理大量資料 |
儲存方式 | 單機快取 | 可以呼叫 persist/cache 分散式快取 |
是否可變 | 是 | 否 |
index索引 | 自動建立 | 無索引 |
行結構 | Pandas.Series | Pyspark.sql.Row |
列結構 | Pandas.Series | Pyspark.sql.Column |
允許列重名 | 否 | 是 |
pandas dataFrame 無法支援大量資料的計算,可以嘗試 spark df 來解決這個問題。
xgboost 預測
優化前
import xgboost as xgb
import pandas as pd
import numpy as np
# 載入模型
bst = xgb.Booster()
bst.load_model("xxx.model")
# 變數列表
var_list=[...]
df.rdd.map(lambda x : cal_xgb_score(x,var_list,ntree_limit=304)).write.toDF()
# 計算分數
def cal_xgb_score(x,var_list,ntree_limit=50):
feature_count = len(var_list)
x1 = pd.DataFrame(np.array(x).reshape(1,feature_count),columns=var_list)
# 資料變化操作
y1 = transformFun(x1)
test_x = xgb.DMatrix(y1.drop(['mobile','mobile_md5'],xais=1),missing=float('nan'))
y1['score'] = bst.predict(test_x,ntree_limit=ntree_limit)
y2 = y1[['mobile','mobile_md5','score']]
return {'mobile': str(y2['mobile'][0]),'mobille_md5':str(y2['mobile_md5'][0]),'score':float(y2['score'][0])}
每條資料都轉化為 pd,增加了額外開銷。
優化後
def cal_xgb_score(x,var_list,ntree_limit=50):
feature_count = len(var_list)
//將 iterator 轉為list
x1 = pd.DataFrame(list(x),columns=var_list)
...
//將 pdf 轉為字典
return y1[['mobile','mobile_md5','score']].to_dict(orient='record')
toPandas
優化前
df.toPandas()
優化後
import pandas as pd
def _map_to_pandas(rdds):
return [pd.DataFrame(list(rdds))]
def toPandas(df, n_partitions=None):
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
# 98列,22W行,型別 array/string/Long/Int,分割槽 200
df = spark.sql("...").sample(False,0.002)
df.cache()
df.count()
# 原生的 toPandas 方法
%timeit df.toPandas()
# 分散式的 toPandas
%timeit toPandas(df)
#使用 apache arrow,spark 版本2.3以上
spark.sql("set spark.sql.execution.arrow.enabled=true")
%timeit df.toPandas()
效果
xgboost 預測
單條資料處理速度從 120 record / min 提高到 3278 record / min
tips: 如果一個分割槽資料量過大將會導致 executor oom
spark dataframe 轉 pandas dataframe
type | cost (seconds) |
---|---|
native toPandas | 12 |
distributed toPandas | 5.91 |
arrow toPandas | 2.52 |
toPandas 返回的資料歸根結底還是快取在 driver 的記憶體中的,不建議返回過大的資料。