1. 程式人生 > >第6章 DateFrame&Dataset

第6章 DateFrame&Dataset

6-1 -課程目錄

6-2 -DataFrame產生背景

DataFrame它不是spark SQL提出的,而是早期在R,Pandas語言就已經有了的。

6-3 -DataFrame概述

6-4 -DataFrame和RDD的對比

RDD:

java/scala==>jvm

python==>python runtime

DataFrame

java/scala/python==>login Plan

6-5 -DataFrame基本API操作

參考程式碼:

https://gitee.com/sag888/big_data/blob/master/%E4%BB%A5%E6%85%95%E8%AF%BE%E7%BD%91%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E4%B8%BA%E4%BE%8B%20%E8%BF%9B%E5%85%A5%E5%A4%A7%E6%95%B0%E6%8D%AE%20Spark%20SQL%20%E7%9A%84%E4%B8%96%E7%95%8C/project/p1867y/ImoocSparkSQLProject/src/main/scala/com/imooc/spark/DataFrameApp.scala

package com.imooc.spark

import org.apache.spark.sql.SparkSession

/**

* DataFrame API基本操作

*/

object DataFrameApp {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()

// 將json檔案載入成一個dataframe

val peopleDF = spark.read.format("json").load("file:///Users/rocky/data/people.json")

// 輸出dataframe對應的schema資訊

peopleDF.printSchema()

// 輸出資料集的前20條記錄

peopleDF.show()

//查詢某列所有的資料: select name from table

peopleDF.select("name").show()

// 查詢某幾列所有的資料,並對列進行計算: select name, age+10 as age2 from table

peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()

//根據某一列的值進行過濾: select * from table where age>19

peopleDF.filter(peopleDF.col("age") > 19).show()

//根據某一列進行分組,然後再進行聚合操作: select age,count(1) from table group by age

peopleDF.groupBy("age").count().show()

spark.stop()

}

}

 

6-6 -DataFrame與RDD互操作方式一

原始碼地址:

https://gitee.com/sag888/big_data/blob/master/%E4%BB%A5%E6%85%95%E8%AF%BE%E7%BD%91%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E4%B8%BA%E4%BE%8B%20%E8%BF%9B%E5%85%A5%E5%A4%A7%E6%95%B0%E6%8D%AE%20Spark%20SQL%20%E7%9A%84%E4%B8%96%E7%95%8C/project/p1867y/ImoocSparkSQLProject/src/main/scala/com/imooc/spark/DataFrameRDDApp.scala

原始碼:

package com.imooc.spark

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

import org.apache.spark.sql.{Row, SparkSession}

/**

* DataFrame和RDD的互操作

*/

object DataFrameRDDApp {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

//inferReflection(spark)

program(spark)

spark.stop()

}

def program(spark: SparkSession): Unit = {

// RDD ==> DataFrame

val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

val structType = StructType(Array(StructField("id", IntegerType, true),

StructField("name", StringType, true),

StructField("age", IntegerType, true)))

val infoDF = spark.createDataFrame(infoRDD,structType)

infoDF.printSchema()

infoDF.show()

//通過df的api進行操作

infoDF.filter(infoDF.col("age") > 30).show

//通過sql的方式進行操作

infoDF.createOrReplaceTempView("infos")

spark.sql("select * from infos where age > 30").show()

}

def inferReflection(spark: SparkSession) {

// RDD ==> DataFrame

val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

//注意:需要匯入隱式轉換

import spark.implicits._

val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

infoDF.show()

infoDF.filter(infoDF.col("age") > 30).show

infoDF.createOrReplaceTempView("infos")

spark.sql("select * from infos where age > 30").show()

}

case class Info(id: Int, name: String, age: Int)

}

 

6-7 dataframe與rdd互操作方式

原始碼地址

https://gitee.com/sag888/big_data/blob/master/%E4%BB%A5%E6%85%95%E8%AF%BE%E7%BD%91%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E4%B8%BA%E4%BE%8B%20%E8%BF%9B%E5%85%A5%E5%A4%A7%E6%95%B0%E6%8D%AE%20Spark%20SQL%20%E7%9A%84%E4%B8%96%E7%95%8C/project/p1867y/ImoocSparkSQLProject/src/main/scala/com/imooc/spark/DataFrameRDDApp.scala

原始碼:

package com.imooc.spark

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

import org.apache.spark.sql.{Row, SparkSession}

/**

* DataFrame和RDD的互操作

*/

object DataFrameRDDApp {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

//inferReflection(spark)

program(spark)

spark.stop()

}

def program(spark: SparkSession): Unit = {

// RDD ==> DataFrame

val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

val structType = StructType(Array(StructField("id", IntegerType, true),

StructField("name", StringType, true),

StructField("age", IntegerType, true)))

val infoDF = spark.createDataFrame(infoRDD,structType)

infoDF.printSchema()

infoDF.show()

//通過df的api進行操作

infoDF.filter(infoDF.col("age") > 30).show

//通過sql的方式進行操作

infoDF.createOrReplaceTempView("infos")

spark.sql("select * from infos where age > 30").show()

}

def inferReflection(spark: SparkSession) {

// RDD ==> DataFrame

val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

//注意:需要匯入隱式轉換

import spark.implicits._

val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

infoDF.show()

infoDF.filter(infoDF.col("age") > 30).show

infoDF.createOrReplaceTempView("infos")

spark.sql("select * from infos where age > 30").show()

}

case class Info(id: Int, name: String, age: Int)

}

 


6-8 -DataFrame API操作案例實戰

學生資訊統計案例

原始檔:student.data

原始碼地址:

https://gitee.com/sag888/big_data/blob/master/%E4%BB%A5%E6%85%95%E8%AF%BE%E7%BD%91%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E4%B8%BA%E4%BE%8B%20%E8%BF%9B%E5%85%A5%E5%A4%A7%E6%95%B0%E6%8D%AE%20Spark%20SQL%20%E7%9A%84%E4%B8%96%E7%95%8C/project/p1867y/ImoocSparkSQLProject/src/main/scala/com/imooc/spark/DataFrameCase.scala

package com.imooc.spark

import org.apache.spark.sql.SparkSession

/**

* DataFrame中的操作操作

*/

object DataFrameCase {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

// RDD ==> DataFrame

val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/student.data")

//注意:需要匯入隱式轉換

import spark.implicits._

val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()

//show預設只顯示前20條

studentDF.show

studentDF.show(30)

studentDF.show(30, false)

studentDF.take(10)

studentDF.first()

studentDF.head(3)

studentDF.select("email").show(30,false)

studentDF.filter("name=''").show

studentDF.filter("name='' OR name='NULL'").show

//name以M開頭的人

studentDF.filter("SUBSTR(name,0,1)='M'").show

studentDF.sort(studentDF("name")).show

studentDF.sort(studentDF("name").desc).show

studentDF.sort("name","id").show

studentDF.sort(studentDF("name").asc, studentDF("id").desc).show

studentDF.select(studentDF("name").as("student_name")).show

val studentDF2 = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()

studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id")).show

spark.stop()

}

case class Student(id: Int, name: String, phone: String, email: String)

}

 

6-9 -Dataset概述及使用

原始碼地址:

https://gitee.com/sag888/big_data/blob/master/%E4%BB%A5%E6%85%95%E8%AF%BE%E7%BD%91%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E4%B8%BA%E4%BE%8B%20%E8%BF%9B%E5%85%A5%E5%A4%A7%E6%95%B0%E6%8D%AE%20Spark%20SQL%20%E7%9A%84%E4%B8%96%E7%95%8C/project/p1867y/ImoocSparkSQLProject/src/main/scala/com/imooc/spark/DatasetApp.scala

package com.imooc.spark

import org.apache.spark.sql.SparkSession

/**

* Dataset操作

*/

object DatasetApp {

def main(args: Array[String]) {

val spark = SparkSession.builder().appName("DatasetApp")

.master("local[2]").getOrCreate()

//注意:需要匯入隱式轉換

import spark.implicits._

val path = "file:///Users/rocky/data/sales.csv"

//spark如何解析csv檔案?

val df = spark.read.option("header","true").option("inferSchema","true").csv(path)

df.show

val ds = df.as[Sales]

ds.map(line => line.itemId).show

spark.sql("seletc name from person").show

//df.seletc("name")

df.select("nname")

ds.map(line => line.itemId)

spark.stop()

}

case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)

}