spark.dataframe的一些常用操作(Scala)
前言
說起dataframe,大家一般會首先想起pandas.dataframe。隨著資料科學越來越火熱,大部分同學都使用過python去進行一些資料科學的實踐,也應該會對dataframe的簡單易用頗有好感。
然而pandas只能用於處理單機問題,面對工業級的海量資料處理和計算,就顯得無能為力。
spark作為分散式計算框架,在工業界佔據了比較主流的地位。spark同樣也提供了dataframe供使用者使用。
spark.dataframe目前在網路上資料還不是很多,在此總結一些spark2.1中常用的一些操作,希望能給讀者提供一些幫助。
1.資料的讀入
資料在spark上經常被寫成.parquet的格式,這是一種分片格式,讀入資料的操作一般是這樣:
val path = "/user/xxx/xxx/xxx/"
val df = spark.read.parquet(path)
對於txt檔案的讀入,可以用以下的方法來操作,不過這樣讀入之後,dataframe的每一行會是單獨的名為value的一列,需要進一步解析,這時候需要用到map函式(會在後面提到)
val txtPath = "xxxxxx"
val df = spark.read.text(txtPath)
如果需要自己生成一些資料的話,可以這樣來寫:
val testDF = Seq( ("1","98:0:0:0:0:0:0:19:93:0:0","1"), ("2","98:0:0:0:0:0:0:0:0:0:0","1"), ("3","0:0:0:0:0:0:0:0:0:0:90","1"), ("4","0:0:0:40:0:40:0:12:0:0:0","1"), ("5","98:0:0:0:0:0:0:19:93:100:0","0"), ("6","0:0:0:100:0:0:0:0:0:0:1","0") ).toDF("id","info","flag")
這樣就生成了一個6行3列的spark.dataframe,這種方法,一般用來進行測試操作。
2.dataframe內容的檢視
以上述testDF為例,如果我們想檢視testDF中所有的資料,可以這樣操作:其中num為展現的行數,預設為20,false表示不進行資訊的縮略。
testDF.show(num,false)
如果想看一個dataframe中有那些列,這些列是什麼樣的資料型別,這種時候可以使用:
testDF.printSchema
如果想知道dataframe的行數
testDF.count()
如果想只看特定某些列,要用到select方法
testDF.select("id","flag").show(10,false)
3.dataframe中的過濾
如果想選擇dataframe中某列為某某的資料或大於或小於某某的資料
val newDF = testDF.filter(col("id")==="1")
val new1DF = testDF.filter(col("value")>=12)
對於多個條件,filter方法可以寫在一起
val new2DF = testDF.filter(col("1")===2 && col("2")<=12)
對於dataframe中資料的去重,可以使用distinct方法,和python中的unique是一個意思
val newDF = testDF.distinct()
4.分組操作
分組操作使用的方法是groupBy,並且附帶聚合操作agg
舉個例子,我們有一個很大的資料集,是關於飲食的,每個使用者過去一個月午餐的食量,類似於這樣
date id weight
1 u1 23
1 u2 24
1 u3 88
2 u1 12
2 u2 19
2 u3 74
3 u1 42
3 u2 12
3 u3 19
我們想得到每個使用者的均值食量,我們需要用到groupBy和agg
val newDF = foodDF.groupBy("id").agg(
mean("weight") as "meanWeight"
)
agg操作就是說在以key進行分組後,對每個分組內的列進行計算,agg中可以同時進行多個操作,生成我們所需要的資料
5.map操作
前面剛剛講過agg操作的思想相當於對列進行操作計算,map正好是相反的,map是對行進行操作的。
比如前文說過的,讀取txt檔案之後,可以txt檔案存在很多列,但只會存在一個列(value)的dataframe,這時候我們就需要用map把這個dataframe解析成我們想要的那個。
val fineDF = notfineDF.map(line=>{
val colArr = line.getString(0).split("\t")
val col1 = colArr(0)
val col2 = colArr(1)
(col1,col2)
}).toDF("col1","col2")
最後
以上是spark.dataframe的一些常用的基礎操作,通過這些操作的組合可以實現工作中絕大部分需求。
最後安利一下stackoverflow,很多不懂的問題都是在上面找到了答案,感謝那些樂於分享的同學們。