1. 程式人生 > >pySpark | pySpark.Dataframe使用的坑 與 經歷

pySpark | pySpark.Dataframe使用的坑 與 經歷

筆者最近在嘗試使用PySpark,發現pyspark.dataframe跟pandas很像,但是資料操作的功能並不強大。由於,pyspark環境非自建,別家工程師也不讓改,導致本來想pyspark環境跑一個隨機森林,用 《Comprehensive Introduction to Apache Spark, RDDs & Dataframes (using PySpark) 》中的案例,也總是報錯…把一些問題進行記錄。

1 利於分析的toPandas()

介於總是不能在別人家pySpark上跑通模型,只能將資料toPandas(),但是toPandas()也會執行慢 執行記憶體不足等問題。

1.1 記憶體不足

報錯: tasks is bigger than spark.driver.maxResultSize  

一般是spark預設會限定記憶體,可以使用以下的方式提高:

set by SparkConf:  conf.set("spark.driver.maxResultSize", "3g")
set by spark-defaults.conf  spark.driver.maxResultSize 3g
set when callingspark-submit    --conf spark.driver.maxResultSize=3g

1.2 執行慢,如何優化效能

1.2.1 Using Arrow to Optimize Conversion

來看看本來執行一段.toDF的code耗時在哪。

from pyspark.sql.functions import rand
df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
pandas_df = df.toPandas()

那麼主要的耗時在:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000
0.000 23.013 23.013 <string>:1(<module>) 1 0.456 0.456 23.013 23.013 dataframe.py:1712(toPandas) 1 0.092 0.092 21.222 21.222 dataframe.py:439(collect) 81 0.000 0.000 20.195 0.249 serializers.py:141(load_stream) 81 0.001 0.000 20.194 0.249 serializers.py:160(_read_with_length) 80 0.000 0.000 20.167 0.252 serializers.py:470(loads) 80 3.280 0.041 20.167 0.252 {cPickle.loads} 4194304 1.024 0.000 16.295 0.000 types.py:1532(<lambda>) 4194304 2.048 0.000 15.270 0.000 types.py:610(fromInternal) 4194304 9.956 0.000 12.552 0.000 types.py:1535(_create_row) 4194304 1.105 0.000 1.807 0.000 types.py:1583(__new__) 1 0.000 0.000 1.335 1.335 frame.py:969(from_records) 1 0.047 0.047 1.321 1.321 frame.py:5693(_to_arrays) 1 0.000 0.000 1.274 1.274 frame.py:5787(_list_to_arrays) 165 0.958 0.006 0.958 0.006 {method 'recv' of '_socket.socket' objects} 4 0.000 0.000 0.935 0.234 java_gateway.py:1150(__call__) 4 0.000 0.000 0.935 0.234 java_gateway.py:885(send_command) 4 0.000 0.000 0.935 0.234 java_gateway.py:1033(send_command) 4 0.000 0.000 0.934 0.234 socket.py:410(readline) 4194304 0.789 0.000 0.789 0.000 types.py:1667(__setattr__) 1 0.000 0.000 0.759 0.759 frame.py:5846(_convert_object_array) 2 0.000 0.000 0.759 0.380 frame.py:5856(convert) 2 0.759 0.380 0.759 0.380 {pandas._libs.lib.maybe_convert_objects} 4194308 0.702 0.000 0.702 0.000 {built-in method __new__ of type object at 0x7fa547e394c0} 4195416 0.671 0.000 0.671 0.000 {isinstance} 4194304 0.586 0.000 0.586 0.000 types.py:1531(_create_row_inbound_converter) 1 0.515 0.515 0.515 0.515 {pandas._libs.lib.to_object_array_tuples}

可以看到很多時間都是花在deserializer上面。
那麼應用了Arrow就不一樣,原文作者的原話:Because Arrow defines a common data format across different language implementations, it is possible to transfer data from Java to Python without any conversions or processing. ,Apache Arrow:一個跨平臺的在記憶體中以列式儲存的資料層,用來加速大資料分析速度。其可以一次性傳入更大塊的資料,pyspark中已經有載入該模組,需要開啟該設定:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

或者也可以在conf/spark-defaults.conf檔案中寫入:spark.sql.execution.arrow.enabled=true
開啟arrow可以看一下效能:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.001    0.001    0.457    0.457 <string>:1(<module>)
        1    0.000    0.000    0.456    0.456 dataframe.py:1712(toPandas)
        1    0.000    0.000    0.442    0.442 dataframe.py:1754(_collectAsArrow)
       53    0.404    0.008    0.404    0.008 {method 'recv' of '_socket.socket' objects}
        4    0.000    0.000    0.389    0.097 java_gateway.py:1150(__call__)
        4    0.000    0.000    0.389    0.097 java_gateway.py:885(send_command)
        4    0.000    0.000    0.389    0.097 java_gateway.py:1033(send_command)
        4    0.000    0.000    0.389    0.097 socket.py:410(readline)
        9    0.000    0.000    0.053    0.006 serializers.py:141(load_stream)
        9    0.000    0.000    0.053    0.006 serializers.py:160(_read_with_length)
       17    0.001    0.000    0.052    0.003 socket.py:340(read)
       48    0.022    0.000    0.022    0.000 {method 'write' of 'cStringIO.StringO' objects}
       13    0.014    0.001    0.014    0.001 {method 'getvalue' of 'cStringIO.StringO' objects}
        1    0.000    0.000    0.013    0.013 {method 'to_pandas' of 'pyarrow.lib.Table' objects}
        1    0.000    0.000    0.013    0.013 pandas_compat.py:107(table_to_blockmanager)
        1    0.013    0.013    0.013    0.013 {pyarrow.lib.table_to_blocks}

比之前快很多,同時serialization and processing的過程全部優化了,只有IO的耗時。來看網路中《PySpark pandas udf》的一次對比:

這裡寫圖片描述

其他,一些限制:
不支援所有的 sparkSQL 資料型別,包括 BinaryType,MapType, ArrayType,TimestampType 和巢狀的 StructType。

1.2.2 重置toPandas()

來自joshlk/faster_toPandas.py的一次嘗試,筆者使用後,發現確實能夠比較快,而且比之前自帶的toPandas()還要更快捷,更能抗壓.

import pandas as pd

def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
return df_pand

那麼在code之中有一個分割槽引數n_partitions,分割槽是啥?(來源:知乎:Spark 分割槽?)RDD 內部的資料集合在邏輯上(以及物理上)被劃分成多個小集合,這樣的每一個小集合被稱為分割槽。像是下面這圖中,三個 RDD,每個 RDD 內部都有兩個分割槽。
這裡寫圖片描述
分割槽的個數決定了平行計算的粒度。比如說像是下面圖介個情況,多個分割槽平行計算,能夠充分利用計算資源。