1. 程式人生 > >Spark學習(伍)- DateFrame&Dataset

Spark學習(伍)- DateFrame&Dataset

文章目錄

DataFrame產生背景

DataFrame它不是Spark SQL提出的,而是早起在R、Pandas語言就已經有了的。

  • Spark RDD API vs MapReduce API Threshold
    使用spark和mr開發需要學習對應語言scala和java等;門檻較高
  • R/Pandas one machine
    這些語言僅僅支援單機處理

sparksql裡的DataFrame誕生易於R和Pandas的人無縫對接;降低了很多人的門檻

DataFrame概述

  • A distributed collection of rows organized into named columns(將行組織命名為列的分散式集合)(RDD with schema)
  • 它在概念上等價於關係資料庫中的表或R/Python中的data frame,但是具有更豐富的優化
  • 用於選擇、篩選、聚合和繪製結構化資料的抽象
  • 受到R和panda單機小資料處理應用於分散式大資料的啟發
  • 以前叫做SchemaRDD (Spark < 1.3)

A Dataset is a distributed collection of data:分散式的資料集

A DataFrame is a Dataset organized into named columns.
以列(列名、列的型別、列值)的形式構成的分散式資料集,按照列賦予不同的名稱;比如:

student
id:int
name:string
city:string
在這裡插入圖片描述

DataFrame和RDD的對比

RDD是Spark的核心
RDD彈性的分散式資料集五大特性

  1. 他有一系列的Partition組成的
  2. 每一個運算元作用在每一個partition上
  3. rdd之間是有依賴關係的
  4. 可選項:分割槽器作用在KV格式的RDD上 (1)分割槽器是在shuffle階段起作用(2) GroupByKey, reduceBykey, join, sortByKey等這些運算元會產生shuffle(3)這些運算元必須作用在KV格式的RDD
  5. RDD會提供一系列最佳計算位置,說白了就是暴露每一個partitior的位置這是資料本地化的基礎
    在這裡插入圖片描述
    RDD:
    java/scala ==> jvm
    python ==> python runtime

DataFrame:
java/scala/python ==> Logic Plan

DataFrame基本API操作

Create DataFame
printSchema
show
select
filter

package com.kun.dataframe
import org.apache.spark.sql.SparkSession
/**
 * DataFrame API基本操作
 */
object DataFrameApp {

  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()

    // 將json檔案載入成一個dataframe
    val peopleDF = spark.read.format("json").load("file:///Users/rocky/data/people.json")

    // 輸出dataframe對應的schema資訊
    peopleDF.printSchema()

    // 輸出資料集的前20條記錄
    peopleDF.show()

    //查詢某列所有的資料: select name from table
    peopleDF.select("name").show()

    // 查詢某幾列所有的資料,並對列進行計算: select name, age+10 as age2 from table
    peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()

    //根據某一列的值進行過濾: select * from table where age>19
    peopleDF.filter(peopleDF.col("age") > 19).show()

    //根據某一列進行分組,然後再進行聚合操作: select age,count(1) from table group by age
    peopleDF.groupBy("age").count().show()

    spark.stop()
  }
}

DataFrame與RDD互操作方式

Spark SQL支援兩種不同的方法將現有的rdd轉換為資料集。第一種方法使用反射來推斷包含特定型別物件的RDD的模式。這種基於反射的方法可以生成更簡潔的程式碼,並且當您在編寫Spark應用程式時已經瞭解模式時,這種方法可以很好地工作。

建立資料集的第二種方法是通過程式設計介面,該介面允許您構建模式,然後將其應用於現有的RDD。雖然這個方法比較冗長,但它允許在直到執行時才知道列及其型別時構造資料集。

測試檔案infos.txt

1,zhangsan,20
2,lisi,30
3,wangwu,40

package com.kun.dataframe

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * DataFrame和RDD的互操作
 */
object DataFrameRDDApp {

  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

    //inferReflection(spark)

    program(spark)

    spark.stop()
  }

  def program(spark: SparkSession): Unit = {//通過程式設計介面
    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

    val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

    val structType = StructType(Array(StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)))

    val infoDF = spark.createDataFrame(infoRDD,structType)
    infoDF.printSchema()
    infoDF.show()


    //通過df的api進行操作
    infoDF.filter(infoDF.col("age") > 30).show

    //通過sql的方式進行操作
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

  def inferReflection(spark: SparkSession) {//(第一種方式使用反射)
    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

    //注意:需要匯入隱式轉換
    import spark.implicits._
    val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show

    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

  case class Info(id: Int, name: String, age: Int)

}

DataFrame和RDD互操作的兩種方式:
1)反射:case class 前提:事先需要知道你的欄位、欄位型別
2)程式設計:Row 如果第一種情況不能滿足你的要求(事先不知道列)
3) 選型:優先考慮第一種

DataFrame API操作案例實戰

student.data檔案:

1|Burke|1-300-746-8446|[email protected]
2|Kamal|1-668-571-5046|[email protected]
3|Olga|1-956-311-1686|[email protected]
4|Belle|1-246-894-6340|[email protected]
5|Trevor|1-300-527-4967|[email protected]
6|Laurel|1-691-379-9921|[email protected]
7|Sara|1-608-140-1995|[email protected]
8|Kaseem|1-881-586-2689|[email protected]
9|Lev|1-916-367-5608|[email protected]
10|Maya|1-271-683-2698|[email protected]
11|Emi|1-467-270-1337|[email protected]
12|Caleb|1-683-212-0896|[email protected]
13|Florence|1-603-575-2444|[email protected]
14|Anika|1-856-828-7883|[email protected]
15|Tarik|1-398-171-2268|[email protected]
16|Amena|1-878-250-3129|[email protected]
17|Blossom|1-154-406-9596|[email protected]
18|Guy|1-869-521-3230|[email protected]
19|Malachi|1-608-637-2772|[email protected]
20|Edward|1-711-710-6552|[email protected]
21||1-711-710-6552|[email protected]
22||1-711-710-6552|[email protected]
23|NULL|1-711-710-6552|[email protected]
package com.kun.dataframe

import org.apache.spark.sql.SparkSession

/**
 * DataFrame中的其他操作
 */
object DataFrameCase {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/student.data")

    //注意:需要匯入隱式轉換
    import spark.implicits._
    val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()

    //show預設只顯示前20條
    studentDF.show
    studentDF.show(30)
    studentDF.show(30, false)

    studentDF.take(10)
    studentDF.first()
    studentDF.head(3)


    studentDF.select("email").show(30,false)


    studentDF.filter("name=''").show
    studentDF.filter("name='' OR name='NULL'").show


    //name以M開頭的人
    studentDF.filter("SUBSTR(name,0,1)='M'").show

    studentDF.sort(studentDF("name")).show
    studentDF.sort(studentDF("name").desc).show

    studentDF.sort("name","id").show
    studentDF.sort(studentDF("name").asc, studentDF("id").desc).show

    studentDF.select(studentDF("name").as("student_name")).show


    val studentDF2 = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()

    studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id")).show

    spark.stop()
  }

  case class Student(id: Int, name: String, phone: String, email: String)

}

同樣可以在spark-shell裡測試;速度比idea裡要快
在這裡插入圖片描述
show()
在這裡插入圖片描述
take取幾個
在這裡插入圖片描述
first取第一個;head取前幾個
在這裡插入圖片描述
檢視email 30個
在這裡插入圖片描述
filter過濾name=“”的
在這裡插入圖片描述
filter過濾name=“”和name=null的
在這裡插入圖片描述
filter過濾名字開頭為M的
在這裡插入圖片描述
檢視函式:
在這裡插入圖片描述
filter過濾名字開頭為M的
在這裡插入圖片描述
排序
在這裡插入圖片描述
降序排
在這裡插入圖片描述
按照name和id排序
在這裡插入圖片描述
按照name升序和id降序排序
在這裡插入圖片描述
更改name為student_name
在這裡插入圖片描述
進行jion操作注意是三個=號
在這裡插入圖片描述
在這裡插入圖片描述

DataSet概述及使用

DataSet是資料的分散式集合。Dataset是Spark 1.6中新增的一個新介面,它提供了RDDs(強型別、使用強大lambda函式的能力)和Spark SQL優化執行引擎的優點。可以從JVM物件構造資料集,然後使用功能轉換(對映、平面對映、篩選器等)操作資料集。資料集API在Scala和Java中可用。Python不支援資料集API。但是由於Python的動態特性,資料集API的許多優點已經可用(例如,您可以通過行名自然地訪問行. columnname的欄位)。R的情況類似。

靜態型別(Static-typing)和執行時型別安全(runtime type-safety)
在這裡插入圖片描述

DataFrame = Dataset[Row]
Dataset:強型別 typed case class
DataFrame:弱型別 Row

SQL:
seletc name from person; compile ok, result no
錯誤sql語句編譯的時候是發現不問題的;ok的;但是執行的時候可以發現是有問題的。

DF:
df.seletc(“name”) compile no
錯誤的dateframe方法,編譯的時候就有問題的。
df.select(“nname”) compile ok
錯誤的dateframe列名;編譯的時候是發現不了的,執行的時候是有問題的。

DS:
ds.map(line => line.itemid) compile no
錯誤的dateset列名和方法名;編譯的時候是可以發現的。

package com.kun.dataframe

import org.apache.spark.sql.SparkSession

/**
 * Dataset操作
 */
object DatasetApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("DatasetApp")
      .master("local[2]").getOrCreate()

    //注意:需要匯入隱式轉換
    import spark.implicits._

    val path = "file:///Users/rocky/data/sales.csv"

    //spark如何解析csv檔案?
    val df = spark.read.option("header","true").option("inferSchema","true").csv(path)
    df.show

	//dateframe轉換為dataset
    val ds = df.as[Sales]
    ds.map(line => line.itemId).show


    spark.sql("seletc name from person").show

    //df.seletc("name")
    df.select("nname")

    ds.map(line => line.itemId)

    spark.stop()
  }

  case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}