spark api學習之Dataset
-
toDF
toDF方法是將Dataset轉換成Dataframe,當然,如果引入了隱式轉換,則可以將rdd轉換成Dataframe,import sparkSession.implicits._ val sourceRdd = sparkSession.sparkContext.makeRDD(Seq(1, 2, 3, 4)) val sourceDF = sourceRdd.toDF("num")
-
as[U : Encoder]: Dataset[U]
將dataset的行資料轉換型別,並返回新的datasetval sourceDF = sparkSession.sparkContext.makeRDD( Seq(("tom z", 60, 18), ("jim x", 50, 20), ("tom y", 68, 18), ("jim t", 70, 20), ("jerry l", 90, 22) )).toDF("name", "score", "age") val sourceFormatDF=sourceDF.as[(String, String, String)]
-
agg
在Dataset上的聚合函式,官方提供了4種過載方法,即4種用法import sparkSession.implicits._ //建立rdd val sourceRdd = sparkSession.sparkContext.makeRDD( Seq(("tom", 60, 18), ("jim", 50, 20), ("tom", 68, 18), ("jim", 70, 20), ("jerry", 90, 22) )) //建立dataFrame val sourceDF = sourceRdd.toDF("name", "score", "age")
1.agg
sourceDF.agg(("score" -> "max"), ("age" -> "avg")).show()
result:
2.agg(exprs: Map[String, String])sourceDF.groupBy("name").agg(Map("score" -> "min", "score" -> "sum")).show()
result:
3.agg(exprs: java.util.Map[String, String])
和用法二一樣,只是引數是java的map4.agg(expr: Column, exprs: Column*)
sourceDF.groupBy("name").agg(max("score"), count("age")).show()
result:
-
na:DataFrameNaFunctions
返回型別是DataFrameNaFunctions用以處理dataset中缺失的資料import sparkSession.implicits._ val testDF = sparkSession.sparkContext.makeRDD( Seq(("a", null), ("b", "hj"), ("c", null) )).toDF("x", "y")
1.drop:DataFrame
drop用來刪除包含 null or NaN的行。drop可以傳入兩類引數,第一類是how:String,如果傳入的是"any",那麼只要包含null or NaN的行就會被刪除,而"all"則表示如果這個行每一列都是null or NaN才會被刪除,預設是"any"。第二類是cols:Array[String]/Seq[String],用來指定要被處理的列,預設是對所有列進行處理testDF.na.drop().show()
result:
2.fill
fill用來替換 null or NaN。fill可以傳入兩類引數,第一類是value:String/Double,用來替換null or NaN的值;第二類則是cols:Seq[String],用來指定被處理的列,沒有被指定的列的 null or NaN則不會被替換,預設是所有列testDF.na.fill("def").show()
result:
3.replace[T]:DataFrame
用來替換指定的值。replace可以傳入兩類引數,第一類是col:String/Seq[String],表示要替換的列;第二類是replacement:Map[T, T],表示要被替換的目標值和替換值。testDF.na.replace[String]("x",Map("c"->"d")).show()
result:
-
replace
-
schema
返回dataset的schema -
printSchema
將schema以tree的形式輸出到控制檯 -
explain
輸出spark sql的執行計劃 -
dtypes
將列名和型別以陣列的形式返回 -
columns
將列名以陣列的形式返回 -
show
展示dataset的資料。官方有2個地方需要注意的,第一,展示的個數預設是20;第二,展示的字元長度預設也是20,超過20的字元會被省略。當然這兩個引數是可以自定義的,所以官方提供了show的5種過載方法,朋友們可以自行查閱原始碼以進一步瞭解 -
distinct
根據輸入dataframe的各列組合去重,當兩行的每一列資料都同等時,這兩行會被去重 -
checkpoint
給dataset設定檢查點 -
cache
將Dataset持久化,持久化級別是“MEMORY_AND_DISK” -
persist(newLevel: StorageLevel)
可以選擇Dataset持久化的級別 -
unpersist
將持久化的Dataset從磁碟或者記憶體中去掉 -
coalesce
coalesce的作用是合併同一機器上的多個partition,目的是解決多個partition資料量差異很大的case帶來的資源浪費 -
repartition(numPartitions: Int)
資料重新分割槽,可以設定引數來設定返回的分割槽數 -
collect
返回dataset的所有行資料 -
take
返回dataset的前幾行 -
first
返回dataset的第一行 -
head(n: Int)
返回dataset的前幾行 -
drop(col: Column)
刪除掉某列 -
withColumnRenamed(existingName: String, newName: String)
更改列名 -
withColumn(colName: String, col: Column)
給已有的dataset新增新的列 -
explode
將dataset的一列分成多行的操作
例子:import sparkSession.implicits._ val sourceRdd = sparkSession.sparkContext.makeRDD( Seq(("tom z", 60, 18), ("jim x", 50, 20), ("tom y", 68, 18), ("jim t", 70, 20), ("jerry l", 90, 22) )) val sourceDF = sourceRdd.toDF("name", "score", "age") sourceDF.select(explode(split(col("name"), " ")).as("new_name")).show()
result:
-
transform
未完待續。。。。。。。。。。。。。。。。。