Spark與Pandas中DataFrame對比(詳細)
阿新 • • 發佈:2019-01-24
Pandas | Spark | |
工作方式 |
單機single machine tool,沒有並行機制parallelism 不支援Hadoop,處理大量資料有瓶頸 |
分散式平行計算框架,內建並行機制parallelism,所有的資料和操作自動並行分佈在各個叢集結點上。以處理in-memory資料的方式處理distributed資料。 支援Hadoop,能處理大量資料 |
延遲機制 | not lazy-evaluated | lazy-evaluated |
記憶體快取 | 單機快取 | persist() or cache()將轉換的RDDs儲存在記憶體 |
DataFrame可變性 | Pandas中DataFrame是可變的 | Spark中RDDs是不可變的,因此DataFrame也是不可變的 |
建立 | 從spark_df轉換:pandas_df = spark_df.toPandas() |
從pandas_df轉換:spark_df = SQLContext.createDataFrame(pandas_df) 另外,createDataFrame支援從list轉換spark_df,其中list元素可以為tuple,dict,rdd |
list,dict,ndarray轉換 | 已有的RDDs轉換 | |
CSV資料集讀取 | 結構化資料檔案讀取 | |
HDF5讀取 | JSON資料集讀取 | |
EXCEL讀取 | Hive表讀取 | |
外部資料庫讀取 | ||
index索引 | 自動建立 | 沒有index索引,若需要需要額外建立該列 |
行結構 | Series結構,屬於Pandas DataFrame結構 | Row結構,屬於Spark DataFrame結構 |
列結構 | Series結構,屬於Pandas DataFrame結構 | Column結構,屬於Spark DataFrame結構,如:DataFrame[name: string] |
列名稱 | 不允許重名 |
允許重名 修改列名採用alias方法 |
列新增 | df[“xx”] = 0 |
df.withColumn(“xx”, 0).show() 會報錯 from pyspark.sql import functions df.withColumn(“xx”, functions.lit(0)).show() |
列修改 | 原來有df[“xx”]列,df[“xx”] = 1 | 原來有df[“xx”]列,df.withColumn(“xx”, 1).show() |
顯示 |
df 不輸出具體內容,輸出具體內容用show方法 輸出形式:DataFrame[age: bigint, name: string] |
|
df 輸出具體內容 | df.show() 輸出具體內容 | |
沒有樹結構輸出形式 | 以樹的形式列印概要:df.printSchema() | |
df.collect() | ||
排序 | df.sort_index() 按軸進行排序 | |
df.sort() 在列中按值進行排序 | df.sort() 在列中按值進行排序 | |
選擇或切片 | df.name 輸出具體內容 |
df[] 不輸出具體內容,輸出具體內容用show方法 df[“name”] 不輸出具體內容,輸出具體內容用show方法 |
df[] 輸出具體內容, df[“name”] 輸出具體內容 |
df.select() 選擇一列或多列 df.select(“name”) 切片 df.select(df[‘name’], df[‘age’]+1) |
|
df[0] df.ix[0] |
df.first() | |
df.head(2) | df.head(2)或者df.take(2) | |
df.tail(2) | ||
切片 df.ix[:3]或者df.ix[:”xx”]或者df[:”xx”] | ||
df.loc[] 通過標籤進行選擇 | ||
df.iloc[] 通過位置進行選擇 | ||
過濾 | df[df[‘age’]>21] | df.filter(df[‘age’]>21) 或者 df.where(df[‘age’]>21) |
整合 |
df.groupby(“age”) df.groupby(“A”).avg(“B”) |
df.groupBy(“age”) df.groupBy(“A”).avg(“B”).show() 應用單個函式 from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() 應用多個函式 |
統計 | df.count() 輸出每一列的非空行數 | df.count() 輸出總行數 |
df.describe() 描述某些列的count, mean, std, min, 25%, 50%, 75%, max | df.describe() 描述某些列的count, mean, stddev, min, max | |
合併 | Pandas下有concat方法,支援軸向合併 | |
Pandas下有merge方法,支援多列合併 同名列自動新增字尾,對應鍵僅保留一份副本 |
Spark下有join方法即df.join() 同名列不自動新增字尾,只有鍵值完全匹配才保留一份副本 |
|
df.join() 支援多列合併 | ||
df.append() 支援多行合併 | ||
缺失資料處理 | 對缺失資料自動新增NaNs | 不自動新增NaNs,且不丟擲錯誤 |
fillna函式:df.fillna() | fillna函式:df.na.fill() | |
dropna函式:df.dropna() | dropna函式:df.na.drop() | |
SQL語句 |
import sqlite3 pd.read_sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19″) |
表格註冊:把DataFrame結構註冊成SQL語句使用型別 df.registerTempTable(“people”) 或者 sqlContext.registerDataFrameAsTable(df, “people”) sqlContext.sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19″) |
功能註冊:把函式註冊成SQL語句使用型別 sqlContext.registerFunction(“stringLengthString”, lambda x: len(x)) sqlContext.sql(“SELECT stringLengthString(‘test’)”) |
||
兩者互相轉換 | pandas_df = spark_df.toPandas() | spark_df = sqlContext.createDataFrame(pandas_df) |
函式應用 | df.apply(f)將df的每一列應用函式f |
df.foreach(f) 或者 df.rdd.foreach(f) 將df的每一列應用函式f df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 將df的每一塊應用函式f |
map-reduce操作 | map(func, list),reduce(func, list) 返回型別seq | df.map(func),df.reduce(func) 返回型別seqRDDs |
diff操作 | 有diff操作,處理時間序列資料(Pandas會對比當前行與上一行) | 沒有diff操作(Spark的上下行是相互獨立,分散式儲存的) |
原文連結:http://www.lining0806.com/spark%E4%B8%8Epandas%E4%B8%ADdataframe%E5%AF%B9%E6%AF%94/