spark基礎之RDD和DataFrame的轉換方式
一 通過定義Case Class,使用反射推斷Schema
定義Case Class,在RDD的轉換過程中使用Case Class可以隱式轉換成SchemaRDD,然後再註冊成表,然後就可以利用sqlContext或者SparkSession操作了。
我們給出一個電影測試資料film.txt,定一個Case Class(Film),然後將資料檔案讀入後隱式轉換成SchemeRDD:film,並將film在SparkSession中註冊,最後對錶進行查詢
1.1 上傳測試資料
hdfs dfs -put /opt/data/film.txt /user/hadoop
1.2 定義SparkSession,靜態匯入其所有成員
val
session = SparkSession.builder()
.appName("Case Class To Define RDD")
.config("spark.some.config.option",
"some-value")
.master("local[*]")
.getOrCreate()
import session.implicits._
1.3 定義Film類,讀入資料並建立檢視
val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt" )
val filmDF = filmRdd
.map(_.split(","))
.map(fields => Film(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat))
.toDF()
filmDF.createOrReplaceTempView("film")
1.4 查詢分數大於5.0的電影
val results =session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0" )
1.5 對獲取到的Dataset進行對映,因為不知道資料的schema,所以我們需要getAs方法獲取對應的列,並將每一行結果返回,最後列印結果
val filmDS = results.map(film => {
val name = film.getAs[String]("name")
val director = film.getAs[String]("director")
val style = film.getAs[String]("style")
val score = film.getAs[Float]("score")
(name,director,style,score)
})
filmDS.show(10)
二 通過程式設計介面,定義Schema,並應用到RDD上
通過使用createDataFrame定義RDD,通常有三個步驟
# 建立初始RDD
# 構建Row型別的RDD
# 構建該RDD對應的schema
然後呼叫createDataFrame方法
2.1 建立SparkSession,靜態匯入成員
val session = SparkSession.builder()
.appName("Create DataFrame API To Define RDD")
.config("spark.some.config.option", "some-value")
.master("local[*]")
.getOrCreate()
import session.implicits._
2.2HDFS 讀取資料,構建初始RDD
val filmRdd = session.sparkContext.textFile("hdfs://hdfs-cluster/user/hadoop/film.txt")
2.3構建Row型別的RDD
val rowRdd = filmRdd
.map(_.split(","))
.map(fields =>
Row(fields(0),fields(1),fields(2),fields(3),fields(4).trim.toInt,fields(5),fields(6).trim.toFloat)
)
2.4 構建該RDD對應的schema
//
這裡的資料型別必須和資料來源所有型別對應val schema:StructType
= StructType(Array(
StructField("filmid",StringType),
StructField("director",StringType),
StructField("name",StringType),
StructField("release_time",StringType),
StructField("box_office",IntegerType),
StructField("style",StringType),
StructField("score",FloatType)
))
2.5 建立DataFrame,並建立匯或者替換檢視,然後查詢查詢分數大於5.0的電影
val filmDF = session.createDataFrame(rowRdd,schema)
filmDF.createOrReplaceTempView("film")
val results = session.sql("SELECT name,director,style,score FROM film WHERE score > 5.0")
2.6 獲取結果,進行展示
val filmDS = results.map(film => {
val name = film.getAs[String]("name")
val director = film.getAs[String]("director")
val style = film.getAs[String]("style")
val score = film.getAs[Float]("score")
(name,director,style,score)
})
filmDS.show(10)