pyspark dataframe列的合併與拆分
阿新 • • 發佈:2018-12-04
使用Spark SQL在對資料進行處理的過程中,可能會遇到對一列資料拆分為多列,或者把多列資料合併為一列。這裡記錄一下目前想到的對DataFrame列資料進行合併和拆分的幾種方法。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local") \ .appName("dataframe_split") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() sc = spark.sparkContext df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True) df.show(3)
原始資料如下所示
-
dataframe列資料的分割
from pyspark.sql.functions import split, explode, concat, concat_ws
df_split = df.withColumn("s", split(df['score'], " "))
df_split.show()
-
dataframe列資料的拆分
zipWithIndex:給每個元素生成一個索引
排序首先基於分割槽索引,然後是每個分割槽內的專案順序.因此,第一個分割槽中的第一個item索引為0,最後一個分割槽中的最後一個item的索引最大.當RDD包含多個分割槽時此方法需要觸發spark作業.
first_row = df.first() numAttrs = len(first_row['score'].split(" ")) print("新增列的個數", numAttrs) attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect() print("列名:", attrs) for name, index in attrs: df_split = df_split.withColumn(name, df_split['s'].getItem(index)) df_split.show()
-
dataframe將一行分成多行
df_explode = df.withColumn("e", explode(split(df['score'], " ")))
df_explode.show()
-
dataframe列資料的合併
列的合併有兩個函式:一個不新增分隔符concat(),一個新增分隔符concat_ws()
concat
df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \
df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_concat.show()
caoncat_ws
df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \
df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_ws.show()
-
dataframe多行轉多列
pivot: 旋轉當前[[dataframe]]列並執行指定的聚合
#DataFrame 資料格式:每個使用者對每部電影的評分 userID 使用者ID,movieID 電影ID,rating評分
df=spark.sparkContext.parallelize([[15,399,2], \
[15,1401,5], \
[15,1608,4], \
[15,20,4], \
[18,100,3], \
[18,1401,3], \
[18,399,1]])\
.toDF(["userID","movieID","rating"])
#pivot 多行轉多列
resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
#結果
resultDF.show()
參考文獻: