1. 程式人生 > >spark api學習之Dataset

spark api學習之Dataset

  • toDF
    toDF方法是將Dataset轉換成Dataframe,當然,如果引入了隱式轉換,則可以將rdd轉換成Dataframe,

      import sparkSession.implicits._
      val sourceRdd = sparkSession.sparkContext.makeRDD(Seq(1, 2, 3, 4))
      val sourceDF = sourceRdd.toDF("num")
    
  • as[U : Encoder]: Dataset[U]
    將dataset的行資料轉換型別,並返回新的dataset

    val sourceDF = sparkSession.sparkContext.makeRDD(
      Seq(("tom z", 60, 18),
        ("jim x", 50, 20),
        ("tom y", 68, 18),
        ("jim t", 70, 20),
        ("jerry l", 90, 22)
      )).toDF("name", "score", "age")
    val sourceFormatDF=sourceDF.as[(String, String, String)]
    
  • agg
    在Dataset上的聚合函式,官方提供了4種過載方法,即4種用法

    import sparkSession.implicits._
    //建立rdd
    val sourceRdd = sparkSession.sparkContext.makeRDD(
     Seq(("tom", 60, 18),
      ("jim", 50, 20),
      ("tom", 68, 18),
      ("jim", 70, 20),
      ("jerry", 90, 22)
     ))
    //建立dataFrame
    val sourceDF = sourceRdd.toDF("name", "score", "age")
    

    1.agg

    (aggExpr: (String, String), aggExprs: (String, String)*)

    sourceDF.agg(("score" -> "max"), ("age" -> "avg")).show()
    

    result:
    在這裡插入圖片描述
    2.agg(exprs: Map[String, String])

    sourceDF.groupBy("name").agg(Map("score" -> "min", "score" -> "sum")).show()
    

    result:
    在這裡插入圖片描述
    3.agg(exprs: java.util.Map[String, String])
    和用法二一樣,只是引數是java的map

    4.agg(expr: Column, exprs: Column*)

    sourceDF.groupBy("name").agg(max("score"), count("age")).show()
    

    result:
    在這裡插入圖片描述

  • na:DataFrameNaFunctions
    返回型別是DataFrameNaFunctions用以處理dataset中缺失的資料

    import sparkSession.implicits._
    val testDF = sparkSession.sparkContext.makeRDD(
      Seq(("a", null),
        ("b", "hj"),
        ("c", null)
      )).toDF("x", "y")
    

    1.drop:DataFrame
    drop用來刪除包含 null or NaN的行。drop可以傳入兩類引數,第一類是how:String,如果傳入的是"any",那麼只要包含null or NaN的行就會被刪除,而"all"則表示如果這個行每一列都是null or NaN才會被刪除,預設是"any"。第二類是cols:Array[String]/Seq[String],用來指定要被處理的列,預設是對所有列進行處理

     testDF.na.drop().show()
    

    result:
    在這裡插入圖片描述
    2.fill
    fill用來替換 null or NaN。fill可以傳入兩類引數,第一類是value:String/Double,用來替換null or NaN的值;第二類則是cols:Seq[String],用來指定被處理的列,沒有被指定的列的 null or NaN則不會被替換,預設是所有列

    testDF.na.fill("def").show()
    

    result:
    在這裡插入圖片描述
    3.replace[T]:DataFrame
    用來替換指定的值。replace可以傳入兩類引數,第一類是col:String/Seq[String],表示要替換的列;第二類是replacement:Map[T, T],表示要被替換的目標值和替換值。

    testDF.na.replace[String]("x",Map("c"->"d")).show()
    

    result:
    在這裡插入圖片描述

  • replace

  • schema
    返回dataset的schema

  • printSchema
    將schema以tree的形式輸出到控制檯

  • explain
    輸出spark sql的執行計劃

  • dtypes
    將列名和型別以陣列的形式返回

  • columns
    將列名以陣列的形式返回

  • show
    展示dataset的資料。官方有2個地方需要注意的,第一,展示的個數預設是20;第二,展示的字元長度預設也是20,超過20的字元會被省略。當然這兩個引數是可以自定義的,所以官方提供了show的5種過載方法,朋友們可以自行查閱原始碼以進一步瞭解

  • distinct
    根據輸入dataframe的各列組合去重,當兩行的每一列資料都同等時,這兩行會被去重

  • checkpoint
    給dataset設定檢查點

  • cache
    將Dataset持久化,持久化級別是“MEMORY_AND_DISK”

  • persist(newLevel: StorageLevel)
    可以選擇Dataset持久化的級別

  • unpersist
    將持久化的Dataset從磁碟或者記憶體中去掉

  • coalesce
    coalesce的作用是合併同一機器上的多個partition,目的是解決多個partition資料量差異很大的case帶來的資源浪費

  • repartition(numPartitions: Int)
    資料重新分割槽,可以設定引數來設定返回的分割槽數

  • collect
    返回dataset的所有行資料

  • take
    返回dataset的前幾行

  • first
    返回dataset的第一行

  • head(n: Int)
    返回dataset的前幾行

  • drop(col: Column)
    刪除掉某列

  • withColumnRenamed(existingName: String, newName: String)
    更改列名

  • withColumn(colName: String, col: Column)
    給已有的dataset新增新的列

  • explode
    將dataset的一列分成多行的操作
    例子:

    import sparkSession.implicits._
    val sourceRdd = sparkSession.sparkContext.makeRDD(
      Seq(("tom z", 60, 18),
        ("jim x", 50, 20),
        ("tom y", 68, 18),
        ("jim t", 70, 20),
        ("jerry l", 90, 22)
      ))
    val sourceDF = sourceRdd.toDF("name", "score", "age")
    sourceDF.select(explode(split(col("name"), " ")).as("new_name")).show()
    

    result:
    在這裡插入圖片描述

  • transform

    未完待續。。。。。。。。。。。。。。。。。