1. 程式人生 > >spark.dataframe的一些常用操作(Scala)

spark.dataframe的一些常用操作(Scala)

前言

說起dataframe,大家一般會首先想起pandas.dataframe。隨著資料科學越來越火熱,大部分同學都使用過python去進行一些資料科學的實踐,也應該會對dataframe的簡單易用頗有好感。
然而pandas只能用於處理單機問題,面對工業級的海量資料處理和計算,就顯得無能為力。
spark作為分散式計算框架,在工業界佔據了比較主流的地位。spark同樣也提供了dataframe供使用者使用。
spark.dataframe目前在網路上資料還不是很多,在此總結一些spark2.1中常用的一些操作,希望能給讀者提供一些幫助。

1.資料的讀入

資料在spark上經常被寫成.parquet的格式,這是一種分片格式,讀入資料的操作一般是這樣:

val path = "/user/xxx/xxx/xxx/"
val df = spark.read.parquet(path)

對於txt檔案的讀入,可以用以下的方法來操作,不過這樣讀入之後,dataframe的每一行會是單獨的名為value的一列,需要進一步解析,這時候需要用到map函式(會在後面提到)

val txtPath = "xxxxxx"
val df = spark.read.text(txtPath)

如果需要自己生成一些資料的話,可以這樣來寫:

 val testDF = Seq(
      ("1","98:0:0:0:0:0:0:19:93:0:0","1"),
      ("2","98:0:0:0:0:0:0:0:0:0:0","1"),
      ("3","0:0:0:0:0:0:0:0:0:0:90","1"),
      ("4","0:0:0:40:0:40:0:12:0:0:0","1"),
      ("5","98:0:0:0:0:0:0:19:93:100:0","0"),
      ("6","0:0:0:100:0:0:0:0:0:0:1","0")
    ).toDF("id","info","flag")

這樣就生成了一個6行3列的spark.dataframe,這種方法,一般用來進行測試操作。

2.dataframe內容的檢視

以上述testDF為例,如果我們想檢視testDF中所有的資料,可以這樣操作:其中num為展現的行數,預設為20,false表示不進行資訊的縮略。

testDF.show(num,false)

如果想看一個dataframe中有那些列,這些列是什麼樣的資料型別,這種時候可以使用:

testDF.printSchema

如果想知道dataframe的行數

testDF.count()

如果想只看特定某些列,要用到select方法

testDF.select("id","flag").show(10,false)

3.dataframe中的過濾

如果想選擇dataframe中某列為某某的資料或大於或小於某某的資料

val newDF = testDF.filter(col("id")==="1")
val new1DF = testDF.filter(col("value")>=12)

對於多個條件,filter方法可以寫在一起

val new2DF = testDF.filter(col("1")===2 && col("2")<=12)

對於dataframe中資料的去重,可以使用distinct方法,和python中的unique是一個意思

val newDF = testDF.distinct()

4.分組操作

分組操作使用的方法是groupBy,並且附帶聚合操作agg
舉個例子,我們有一個很大的資料集,是關於飲食的,每個使用者過去一個月午餐的食量,類似於這樣
date id weight
1 u1 23
1 u2 24
1 u3 88
2 u1 12
2 u2 19
2 u3 74
3 u1 42
3 u2 12
3 u3 19
我們想得到每個使用者的均值食量,我們需要用到groupBy和agg

val newDF = foodDF.groupBy("id").agg(
	mean("weight") as "meanWeight"
)

agg操作就是說在以key進行分組後,對每個分組內的列進行計算,agg中可以同時進行多個操作,生成我們所需要的資料

5.map操作

前面剛剛講過agg操作的思想相當於對列進行操作計算,map正好是相反的,map是對行進行操作的。
比如前文說過的,讀取txt檔案之後,可以txt檔案存在很多列,但只會存在一個列(value)的dataframe,這時候我們就需要用map把這個dataframe解析成我們想要的那個。

val fineDF = notfineDF.map(line=>{
	val colArr = line.getString(0).split("\t")
	val col1 = colArr(0)
	val col2 = colArr(1)
	(col1,col2)
}).toDF("col1","col2")

最後

以上是spark.dataframe的一些常用的基礎操作,通過這些操作的組合可以實現工作中絕大部分需求。
最後安利一下stackoverflow,很多不懂的問題都是在上面找到了答案,感謝那些樂於分享的同學們。