1. 程式人生 > >pyspark系列--dataframe基礎

pyspark系列--dataframe基礎

dataframe基礎

1. 連線本地spark

import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('my_first_app_name') \
    .getOrCreate()

2. 建立dataframe

# 從pandas dataframe建立spark dataframe
colors = ['white','green','yellow','red','brown','pink'
] color_df=pd.DataFrame(colors,columns=['color']) color_df['length']=color_df['color'].apply(len) color_df=spark.createDataFrame(color_df) color_df.show()

3. 檢視欄位型別

# 檢視列的型別 ,同pandas
color_df.dtypes

# [('color', 'string'), ('length', 'bigint')]

4. 檢視列名

# 檢視有哪些列 ,同pandas
color_df.columns

# ['color', 'length']

5. 檢視行數

# 行數
color_df.count()

# 如果是pandas
len(color_df)

6. 重新命名列名

# dataframe列名重新命名
# pandas
df=df.rename(columns={'a':'aa'})

# spark-1
# 在建立dataframe的時候重新命名
data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)],schema=['name','length'])
data.show()
data.printSchema()

# spark-2
# 使用selectExpr方法 color_df2 = color_df.selectExpr('color as color2','length as length2') color_df2.show() # spark-3 # withColumnRenamed方法 color_df2 = color_df.withColumnRenamed('color','color2')\ .withColumnRenamed('length','length2') color_df2.show() # spark-4 # alias 方法 color_df.select(color_df.color.alias('color2')).show()

7. 選擇和切片篩選

這個應該是dataframe最常用最重要的操作了。

# 1.列的選擇
# 選擇一列的幾種方式,比較麻煩,不像pandas直接用df['cols']就可以了
# 需要在filter,select等操作符中才能使用
color_df.select('length').show()
color_df.select(color_df.length).show()
color_df.select(color_df[0]).show()
color_df.select(color_df['length']).show()
color_df.filter(color_df['length']>=4).show()   # filter方法

# 2.選擇幾列的方法
color_df.select('length','color').show()
# 如果是pandas,似乎要簡單些
color_df[['length','color']]

# 3.多列選擇和切片
color_df.select('length','color') \
        .select(color_df['length']>4).show()

# 4.between 範圍選擇
color_df.filter(color_df.length.between(4,5) )\
        .select(color_df.color.alias('mid_length')).show()


# 5.聯合篩選
# 這裡使用一種是 color_df.length, 另一種是color_df[0]
color_df.filter(color_df.length>4)\
        .filter(color_df[0]!='white').show()

# 6.filter執行類SQL
color_df.filter("color='green'").show()

color_df.filter("color like 'b%'").show()

# 7.where方法的SQL
color_df.where("color like '%yellow%'").show()

# 8.直接使用SQL語法
# 首先dataframe註冊為臨時表,然後執行SQL查詢
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()

8. 刪除一列

# 刪除一列
color_df.drop('length').show()

# pandas寫法
df.drop(labels=['a'],axis=1)

9. 增加一列

from pyspark.sql.functions import lit
df1.withColumn('newCol', lit(0)).show()

10. 轉json

# dataframe轉json,和pandas很像啊
color_df.toJSON().first()

11. 排序

# pandas的排序
df.sort_values(by='b')

# spark排序
color_df.sort('color',ascending=False).show()

# 多欄位排序
color_df.filter(color_df['length']>=4)\
        .sort('length', 'color', ascending=False).show()

# 混合排序
color_df.sort(color_df.length.desc(), color_df.color.asc()).show()

# orderBy也是排序,返回的Row物件列表
color_df.orderBy('length','color').take(4)

12. 缺失值

# 1.生成測試資料
import numpy as np
import pandas as pd

df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e'])\
    .applymap(lambda x: int(x*10))
df.iloc[2,2]=np.nan

spark_df = spark.createDataFrame(df)
spark_df.show()

# 2.刪除有缺失值的行
df2 = spark_df.dropna()
df2.show()

# 3.或者
spark_df=spark_df.na.drop()