1. 程式人生 > 實用技巧 >PySpark︱DataFrame操作指南:增/刪/改/查/合併/統計與資料處理

PySpark︱DataFrame操作指南:增/刪/改/查/合併/統計與資料處理

筆者最近需要使用pyspark進行資料整理,於是乎給自己整理一份使用指南。pyspark.dataframe跟pandas的差別還是挺大的。


文章目錄


1、-------- 查 --------

— 1.1 行元素查詢操作 —

像SQL那樣列印列表前20元素

show函式內可用int型別指定要列印的行數:

df.show()
df.show(30)
  • 1
  • 2
以樹的形式列印概要
df.printSchema()
  • 1
獲取頭幾行到本地:
list = df.head(3)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.take(5)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
  • 1
  • 2
查詢總行數:
 int_num = df.count()

  • 1
  • 2
取別名
df.select(df.age.alias('age_value'),'name')
  • 1
查詢某列為null的行:
from pyspark.sql.functions import isnull
df = df.filter(isnull("col_a"))
  • 1
  • 2
輸出list型別,list中每個元素是Row類:
list = df.collect()
  • 1

注:此方法將所有資料全部匯入到本地,返回一個Array物件

查詢概況
df.describe().show()
  • 1

以及查詢型別,之前是type,現在是df.printSchema()

root
 |-- user_pin: string (nullable = true)
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- e: string (nullable = true)
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

如上圖所示,只是打印出來。

去重set操作

data.select('columns').distinct().show()
  • 1

跟py中的set一樣,可以distinct()一下去重,同時也可以.count()計算剩餘個數

隨機抽樣

隨機抽樣有兩種方式,一種是在HIVE裡面查數隨機;另一種是在pyspark之中。

HIVE裡面查數隨機

sql = "select * from data order by rand()  limit 2000"
  • 1

pyspark之中

sample = result.sample(False,0.5,0) # randomly select 50% of lines 
  • 1

— 1.2 列元素操作 —

獲取Row元素的所有列名:
r = Row(age=11, name='Alice')
print r.columns    #  ['age', 'name']
  • 1
  • 2
選擇一列或多列:select
df["age"]
df.age
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c)    # 選擇a、b、c三列
df.select(df["a"], df["b"], df["c"])    # 選擇a、b、c三列
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
過載的select方法:
jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)
  • 1

會同時顯示id列 + id + 1列

還可以用where按條件選擇
jdbcDF .where("id = 1 or c1 = 'b'" ).show()
  • 1

— 1.3 排序 —

orderBy和sort:按指定欄位排序,預設為升序

train.orderBy(train.Purchase.desc()).show(5)
Output:
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1003160| P00052842|     M|26-35|        17|            C|                         3|             0|                10|                15|              null|   23961|
|1002272| P00052842|     M|26-35|         0|            C|                         1|             0|                10|                15|              null|   23961|
|1001474| P00052842|     M|26-35|         4|            A|                         2|             1|                10|                15|              null|   23961|
|1005848| P00119342|     M|51-55|        20|            A|                         0|             1|                10|                13|              null|   23960|
|1005596| P00117642|     M|36-45|        12|            B|                         1|             0|                10|                16|              null|   23960|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 5 rows
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

按指定欄位排序。加個-表示降序排序

— 1.4 抽樣 —

sample是抽樣函式

t1 = train.sample(False, 0.2, 42)
t2 = train.sample(False, 0.2, 43)
t1.count(),t2.count()
Output:
(109812, 109745)
  • 1
  • 2
  • 3
  • 4
  • 5

withReplacement = True or False代表是否有放回。
fraction = x, where x = .5,代表抽取百分比

— 1.5 按條件篩選when / between —

when(condition, value1).otherwise(value2)聯合使用:
那麼:當滿足條件condition的指賦值為values1,不滿足條件的則賦值為values2.
otherwise表示,不滿足條件的情況下,應該賦值為啥。

demo1

>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice|                                                          -1|
|  Bob|                                                           1|
+-----+------------------------------------------------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

demo 2:多個when串聯

df = df.withColumn('mod_val_test1',F.when(df['rand'] <= 0.35,1).when(df['rand'] <= 0.7, 2).otherwise(3))
  • 1

between(lowerBound, upperBound)
篩選出某個範圍內的值,返回的是TRUE or FALSE

>>> df.select(df.name, df.age.between(2, 4)).show()
+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice|                       true|
|  Bob|                      false|
+-----+---------------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

選擇dataframe中間的特定行數
而我使用的dataframe前兩種方法都沒法解決。特點如下:

特定列中的內容為字串,並非數值,不能直接比較大小。
所選取資料為中間行,如第10~20行,不能用函式直接選取。
最終的解決方法如下:

首先新增行索引,然後選擇特定區間內的行索引,從而選取特定中間行。
第一步,新增行索引。

from pyspark.sql.functions import monotonically_increasing_id
 
dfWithIndex = df.withColumn(“id”,monotonically_increasing_id())
  • 1
  • 2
  • 3

第二步,篩選特定行。

dfWithIndex.select(dfWithIndex.name, dfWithIndex.id.between(50, 100)).show()
  • 1

2、-------- 增、改 --------

— 2.1 新建資料 —

有這麼兩種常規的新建資料方式:createDataFrame.toDF()

sqlContext.createDataFrame(pd.dataframe())
  • 1

是把pandasdataframe轉化為spark.dataframe格式,所以可以作為兩者的格式轉化

from pyspark.sql import Row
row = Row("spe_id", "InOther")
x = ['x1','x2']
y = ['y1','y2']
new_df = sc.parallelize([row(x[i], y[i]) for i in range(2)]).toDF()
  • 1
  • 2
  • 3
  • 4
  • 5

Row代表的是該資料集的列名。

— 2.2 新增資料列 withColumn—

withColumn是通過新增或替換與現有列有相同的名字的列,返回一個新的DataFrame

result3.withColumn('label', 0)
  • 1

或者案例

train.withColumn('Purchase_new', train.Purchase /2.0).select('Purchase','Purchase_new').show(5)
Output:
+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
|    7969|      3984.5|
+--------+------------+
only showing top 5 rows
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

**報錯:**AssertionError: col should be Column,一定要指定某現有列

有兩種方式可以實現:

一種方式通過functions
from pyspark.sql import functions
result3 = result3.withColumn('label',  functions.lit(0))
  • 1
  • 2

但是!! 如何新增一個特別List??(參考:王強的知乎回覆)
python中的list不能直接新增到dataframe中,需要先將list轉為新的dataframe,然後新的dataframe和老的dataframe進行join操作, 下面的例子會先新建一個dataframe,然後將list轉為dataframe,然後將兩者join起來。

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())
df.show()
+---+---+-----+---+
| x1| x2|   x3| id|
+---+---+-----+---+
|  1|  a| 23.0|  0|
|  3|  B|-23.0|  1|
+---+---+-----+---+
from pyspark.sql import Row
l = ['jerry', 'tom']
row = Row("pid", "name")
new_df = sc.parallelize([row(i, l[i]) for i in range(0,len(l))]).toDF()
new_df.show()
+---+-----+
|pid| name|
+---+-----+
|  0|jerry|
|  1|  tom|
+---+-----+
join_df = df.join(new_df, df.id==new_df.pid)
join_df.show()
+---+---+-----+---+---+-----+
| x1| x2|   x3| id|pid| name|
+---+---+-----+---+---+-----+
|  1|  a| 23.0|  0|  0|jerry|
|  3|  B|-23.0|  1|  1|  tom|
+---+---+-----+---+---+-----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

#####**坑啊!!!**其中,monotonically_increasing_id()生成的ID保證是單調遞增和唯一的,但不是連續的。
所以,有可能,單調到1-140000,到了第144848個,就變成一長串:8845648744563,所以千萬要注意!!

另一種方式通過另一個已有變數:
result3 = result3.withColumn('label',  df.result*0 )
  • 1
修改原有df[“xx”]列的所有值:
df = df.withColumn(“xx”, 1)
  • 1
修改列的型別(型別投射):
df = df.withColumn("year2", df["year1"].cast("Int"))
  • 1
修改列名
jdbcDF.withColumnRenamed( "id" , "idx" )
  • 1

— 2.3 過濾資料—

#####過濾資料(filter和where方法相同):

df = df.filter(df['age']>21)
df = df.where(df['age']>21)
  • 1
  • 2

多個條件jdbcDF .filter(“id = 1 or c1 = ‘b’” ).show()

#####對null或nan資料進行過濾:

from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a"))  # 把a列裡面資料為null的篩選出來(代表python的None型別)
df = df.filter(isnan("a"))  # 把a列裡面資料為nan的篩選出來(Not a Number,非數字資料)
  • 1
  • 2
  • 3
新增-isin()

參考:
PySpark:使用isin過濾返回空資料框
[pyspark 實踐彙總2](https://blog.csdn.net/yepeng2007fei/article/details/78874306)

有兩個資料集,從data_1中抽取出data_2中的相同的元素

可行的方式:
df_ori_part = df_ori[df_ori['user_pin'].isin(list(df_1['user_pin']))]
df_ori_part = df_ori.filter(df_ori['user_pin'].isin(list(df_1['user_pin'])) == True )

不可行:
df_ori_part = df_ori.filter(~df_ori['user_pin'].isin(list(df_1['user_pin'])) )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3、-------- 合併 join / union --------

3.1 橫向拼接rbind

result3 = result1.union(result2)
jdbcDF.unionALL(jdbcDF.limit(1)) # unionALL
  • 1
  • 2

— 3.2 Join根據條件 —

單欄位Join

合併2個表的join方法:

 df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")
  • 1

其中,方法可以為:inner,outer,left_outer,right_outer,leftsemi.
其中注意,一般需要改為:left_outer

多欄位join
joinDF1.join(joinDF2, Seq("id", "name"))
  • 1
混合欄位
joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))
  • 1

跟pandas 裡面的left_on,right_on

— 3.2 求並集、交集 —

來看一個例子,先構造兩個dataframe:

sentenceDataFrame = spark.createDataFrame((
      (1, "asf"),
      (2, "2143"),
      (3, "rfds")
    )).toDF("label", "sentence")
sentenceDataFrame.show()

sentenceDataFrame1 = spark.createDataFrame((
      (1, "asf"),
      (2, "2143"),
      (4, "f8934y")
    )).toDF("label", "sentence")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
# 差集
newDF = sentenceDataFrame1.select("sentence").subtract(sentenceDataFrame.select("sentence"))
newDF.show()

+--------+
|sentence|
+--------+
|  f8934y|
+--------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
# 交集
newDF = sentenceDataFrame1.select("sentence").intersect(sentenceDataFrame.select("sentence"))
newDF.show()

+--------+
|sentence|
+--------+
|     asf|
|    2143|
+--------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
# 並集
newDF = sentenceDataFrame1.select("sentence").union(sentenceDataFrame.select("sentence"))
newDF.show()

+--------+
|sentence|
+--------+
|     asf|
|    2143|
|  f8934y|
|     asf|
|    2143|
|    rfds|
+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
# 並集 + 去重
newDF = sentenceDataFrame1.select("sentence").union(sentenceDataFrame.select("sentence")).distinct()
newDF.show()

+--------+
|sentence|
+--------+
|    rfds|
|     asf|
|    2143|
|  f8934y|
+--------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

— 3.3 分割:行轉列 —

有時候需要根據某個欄位內容進行分割,然後生成多行,這時可以使用explode方法
  下面程式碼中,根據c3欄位中的空格將欄位內容進行分割,分割的內容儲存在新的欄位c3_中,如下所示

jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}
  • 1


4 -------- 統計 --------

— 4.1 頻數統計與篩選 ----

jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()
  • 1

根據c4欄位,統計該欄位值出現頻率在30%以上的內容

— 4.2 分組統計—

交叉分析
train.crosstab('Age', 'Gender').show()
Output:
+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
groupBy方法整合:
train.groupby('Age').agg({'Purchase': 'mean'}).show()
Output:
+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|51-55|9534.808030960236|
|46-50|9208.625697468327|
| 0-17|8933.464640444974|
|36-45|9331.350694917874|
|26-35|9252.690632869888|
|  55+|9336.280459449405|
|18-25|9169.663606261289|
+-----+-----------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

另外一些demo:

df['x1'].groupby(df['x2']).count().reset_index(name='x1')
  • 1

分組彙總

train.groupby('Age').count().show()
Output:
+-----+------+
|  Age| count|
+-----+------+
|51-55| 38501|
|46-50| 45701|
| 0-17| 15102|
|36-45|110013|
|26-35|219587|
|  55+| 21504|
|18-25| 99660|
+-----+------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

應用多個函式:

from pyspark.sql import functions
df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
  • 1
  • 2
整合後GroupedData型別可用的方法(均返回DataFrame型別):
avg(*cols)     ——   計算每組中一列或多列的平均值
count()          ——   計算每組中一共有多少行,返回DataFrame有2列,一列為分組的組名,另一列為行總數
max(*cols)    ——   計算每組中一列或多列的最大值
mean(*cols)  ——  計算每組中一列或多列的平均值
min(*cols)     ——  計算每組中一列或多列的最小值
sum(*cols)    ——   計算每組中一列或多列的總和
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

— 4.3 apply 函式 —

將df的每一列應用函式f:

df.foreach(f) 或者 df.rdd.foreach(f)
  • 1

將df的每一塊應用函式f:

df.foreachPartition(f) 或者 df.rdd.foreachPartition(f)
  • 1

---- 4.4 【Map和Reduce應用】返回型別seqRDDs ----

map函式應用
可以參考:Spark Python API函式學習:pyspark API(1)

train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)
Output:
[(Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000002), 1)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

其中map在spark2.0就移除了,所以只能由rdd.呼叫。

data.select('col').rdd.map(lambda l: 1 if l in ['a','b'] else 0 ).collect()

print(x.collect()) 
print(y.collect())
 
[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

還有一種方式mapPartitions

def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

data.rdd.mapPartitions(_map_to_pandas).collect()
  • 1
  • 2
  • 3
  • 4
  • 5

返回的是list。

udf 函式應用

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import datetime

# 定義一個 udf 函式 
def today(day):
    if day==None:
        return datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d')
    else:
        return day

# 返回型別為字串型別
udfday = udf(today, StringType())
# 使用
df.withColumn('day', udfday(df.day))

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

有點類似apply,定義一個 udf 方法, 用來返回今天的日期(yyyy-MM-dd):


-------- 5、刪除 --------

df.drop('age').collect()
df.drop(df.age).collect()
  • 1
  • 2

dropna函式:

df = df.na.drop()  # 扔掉任何列包含na的行
df = df.dropna(subset=['col_name1', 'col_name2'])  # 扔掉col1或col2中任一一列包含na的行

  • 1
  • 2
  • 3

ex:

train.dropna().count()
Output:
166821
  • 1
  • 2
  • 3

填充NA包括fillna

train.fillna(-1).show(2)
Output:
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 2 rows
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

-------- 6、去重 --------

6.1 distinct:返回一個不包含重複記錄的DataFrame

返回當前DataFrame中不重複的Row記錄。該方法和接下來的dropDuplicates()方法不傳入指定欄位時的結果相同。
  示例:

jdbcDF.distinct()
  • 1
6.2 dropDuplicates:根據指定欄位去重

根據指定欄位去重。類似於select distinct a, b操作
示例:


train.select('Age','Gender').dropDuplicates().show()
Output:
+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|51-55|     M|
|26-35|     F|
|26-35|     M|
|36-45|     F|
|36-45|     M|
|46-50|     F|
|46-50|     M|
|  55+|     F|
|  55+|     M|
|18-25|     F|
| 0-17|     F|
|18-25|     M|
| 0-17|     M|
+-----+------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

-------- 7、 格式轉換 --------

pandas-spark.dataframe互轉

Pandas和Spark的DataFrame兩者互相轉換:

pandas_df = spark_df.toPandas()	
spark_df = sqlContext.createDataFrame(pandas_df)
  • 1
  • 2

轉化為pandas,但是該資料要讀入記憶體,如果資料量大的話,很難跑得動

兩者的異同:

  • Pyspark DataFrame是在分散式節點上執行一些資料操作,而pandas是不可能的;
  • Pyspark DataFrame的資料反映比較緩慢,沒有Pandas那麼及時反映;
  • Pyspark DataFrame的資料框是不可變的,不能任意新增列,只能通過合併進行;
  • pandas比Pyspark DataFrame有更多方便的操作以及很強大
轉化為RDD

與Spark RDD的相互轉換:

rdd_df = df.rdd	
df = rdd_df.toDF()
  • 1
  • 2

-------- 8、SQL操作 --------

DataFrame註冊成SQL的表:

df.createOrReplaceTempView("TBL1")
  • 1

進行SQL查詢(返回DataFrame):

conf = SparkConf()
ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()

df = ss.sql(“SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″)
  • 1
  • 2
  • 3
  • 4

-------- 9、讀寫csv --------

在Python中,我們也可以使用SQLContext類中 load/save函式來讀取和儲存CSV檔案:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)	 
df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv",header="true")
  • 1
  • 2
  • 3
  • 4

其中,header代表是否顯示錶頭。
其中主函式:

save(path=None, format=None, mode=None, partitionBy=None, **options)[source]
  • 1

Parameters:

  • path – the path in a Hadoop supported file system

  • format – the format used to save

  • mode –

    • specifies the behavior of the save operation when data already
      exists.

    • append: Append contents of this DataFrame to existing data.

    • overwrite: Overwrite existing data.

    • ignore: Silently ignore this operation if data already exists.

    • error (default case): Throw an exception if data already exists.

  • partitionBy – names of partitioning columns

  • options – all other string options


延伸一:去除兩個表重複的內容

場景是要,依據B表與A表共有的內容,需要去除這部分共有的。
使用的邏輯是merge兩張表,然後把匹配到的刪除即可。

from pyspark.sql import functions
def LeftDeleteRight(test_left,test_right,left_col = 'user_pin',right_col = 'user_pin'):
    print('right data process ...')
    columns_right = test_right.columns
    test_right = test_right.withColumn('user_pin_right', test_right[right_col])
    test_right = test_right.withColumn('notDelete',  functions.lit(0))
    # 刪除其餘的
    for col in columns_right:
        test_right = test_right.drop(col)
    # 合併
    print('rbind left and right data ...')
    test_left = test_left.join(test_right, test_left[left_col] == test_right['user_pin_right'], "left")
    test_left = test_left.fillna(1)
    test_left = test_left.where('notDelete =1')
    # 去掉多餘的欄位
    for col in ['user_pin_right','notDelete']:
        test_left = test_left.drop(col)
    return test_left

%time  test_left = LeftDeleteRight(test_b,test_a,left_col = 'user_pin',right_col = 'user_pin')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

延伸二:報錯

Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in

  • 1
  • 2

解決方案

這裡遇到的問題主要是因為資料來源資料量過大,而機器的記憶體無法滿足需求,導致長時間執行超時斷開的情況,資料無法有效進行互動計算,因此有必要增加記憶體