第6章 DateFrame&Dataset
6-1 -課程目錄
6-2 -DataFrame產生背景
DataFrame它不是spark SQL提出的,而是早期在R,Pandas語言就已經有了的。
6-3 -DataFrame概述
6-4 -DataFrame和RDD的對比
RDD:
java/scala==>jvm
python==>python runtime
DataFrame
java/scala/python==>login Plan
6-5 -DataFrame基本API操作
參考程式碼:
package com.imooc.spark
import org.apache.spark.sql.SparkSession
/**
* DataFrame API基本操作
*/
object DataFrameApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()
// 將json檔案載入成一個dataframe
val peopleDF = spark.read.format("json").load("file:///Users/rocky/data/people.json")
// 輸出dataframe對應的schema資訊
peopleDF.printSchema()
// 輸出資料集的前20條記錄
peopleDF.show()
//查詢某列所有的資料: select name from table
peopleDF.select("name").show()
// 查詢某幾列所有的資料,並對列進行計算: select name, age+10 as age2 from table
peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()
//根據某一列的值進行過濾: select * from table where age>19
peopleDF.filter(peopleDF.col("age") > 19).show()
//根據某一列進行分組,然後再進行聚合操作: select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
spark.stop()
}
}
6-6 -DataFrame與RDD互操作方式一
原始碼地址:
原始碼:
package com.imooc.spark
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* DataFrame和RDD的互操作
*/
object DataFrameRDDApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()
//inferReflection(spark)
program(spark)
spark.stop()
}
def program(spark: SparkSession): Unit = {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
val infoDF = spark.createDataFrame(infoRDD,structType)
infoDF.printSchema()
infoDF.show()
//通過df的api進行操作
infoDF.filter(infoDF.col("age") > 30).show
//通過sql的方式進行操作
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
def inferReflection(spark: SparkSession) {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
//注意:需要匯入隱式轉換
import spark.implicits._
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
infoDF.show()
infoDF.filter(infoDF.col("age") > 30).show
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
case class Info(id: Int, name: String, age: Int)
}
6-7 dataframe與rdd互操作方式
原始碼地址
原始碼:
package com.imooc.spark
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* DataFrame和RDD的互操作
*/
object DataFrameRDDApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()
//inferReflection(spark)
program(spark)
spark.stop()
}
def program(spark: SparkSession): Unit = {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
val infoDF = spark.createDataFrame(infoRDD,structType)
infoDF.printSchema()
infoDF.show()
//通過df的api進行操作
infoDF.filter(infoDF.col("age") > 30).show
//通過sql的方式進行操作
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
def inferReflection(spark: SparkSession) {
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")
//注意:需要匯入隱式轉換
import spark.implicits._
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
infoDF.show()
infoDF.filter(infoDF.col("age") > 30).show
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
case class Info(id: Int, name: String, age: Int)
}
6-8 -DataFrame API操作案例實戰
學生資訊統計案例
原始檔:student.data
原始碼地址:
package com.imooc.spark
import org.apache.spark.sql.SparkSession
/**
* DataFrame中的操作操作
*/
object DataFrameCase {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/student.data")
//注意:需要匯入隱式轉換
import spark.implicits._
val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
//show預設只顯示前20條
studentDF.show
studentDF.show(30)
studentDF.show(30, false)
studentDF.take(10)
studentDF.first()
studentDF.head(3)
studentDF.select("email").show(30,false)
studentDF.filter("name=''").show
studentDF.filter("name='' OR name='NULL'").show
//name以M開頭的人
studentDF.filter("SUBSTR(name,0,1)='M'").show
studentDF.sort(studentDF("name")).show
studentDF.sort(studentDF("name").desc).show
studentDF.sort("name","id").show
studentDF.sort(studentDF("name").asc, studentDF("id").desc).show
studentDF.select(studentDF("name").as("student_name")).show
val studentDF2 = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id")).show
spark.stop()
}
case class Student(id: Int, name: String, phone: String, email: String)
}
6-9 -Dataset概述及使用
原始碼地址:
package com.imooc.spark
import org.apache.spark.sql.SparkSession
/**
* Dataset操作
*/
object DatasetApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DatasetApp")
.master("local[2]").getOrCreate()
//注意:需要匯入隱式轉換
import spark.implicits._
val path = "file:///Users/rocky/data/sales.csv"
//spark如何解析csv檔案?
val df = spark.read.option("header","true").option("inferSchema","true").csv(path)
df.show
val ds = df.as[Sales]
ds.map(line => line.itemId).show
spark.sql("seletc name from person").show
//df.seletc("name")
df.select("nname")
ds.map(line => line.itemId)
spark.stop()
}
case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}