Spark 2.0介紹:從RDD API遷移到DataSet API
阿新 • • 發佈:2019-02-12
RDD遷移到DataSet
DataSet API將RDD和DataFrame兩者的優點整合起來,DataSet中的許多API模仿了RDD的API,雖然兩者的實現很不一樣。所以大多數呼叫RDD API編寫的程式可以很容易地遷移到DataSet API中,下面我將簡單地展示幾個片段來說明如何將RDD編寫的程式遷移到DataSet。
1、載入檔案
RDD
val rdd = sparkContext.textFile("src/main/resources/data.txt")
Dataset
val ds = sparkSession.read.text("src/main/resources/data.txt" )
2、計算總數
RDD
rdd.count()
Dataset
ds.count()
3、WordCount例項
RDD
val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
val wordsPair = wordsRDD.map(word => (word,1))
val wordCount = wordsPair.reduceByKey(_+_)
Dataset
import sparkSession.implicits._
val wordsDs = ds.flatMap(value => value.split("\\s+"))
val wordsPairDs = wordsDs.groupByKey(value => value)
val wordCountDs = wordsPairDs.count()
4、快取(Caching)
RDD
rdd.cache()
Dataset
ds.cache()
5、過濾(Filter)
RDD
val filteredRDD = wordsRDD.filter(value => value =="hello")
Dataset
val filteredDS = wordsDs.filter (value => value =="hello")
6、Map Partitions
RDD
val mapPartitionsRDD = rdd.mapPartitions(iterator =>
List(iterator.count(value => true)).iterator)
Dataset
val mapPartitionsDs = ds.mapPartitions(iterator =>
List(iterator.count(value => true)).iterator)
7、reduceByKey
RDD
val reduceCountByRDD = wordsPair.reduceByKey(_+_)
Dataset
val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))
8、RDD和DataSet互相轉換
RDD
val dsToRDD = ds.rdd
Dataset
將RDD轉換成DataFrame需要做一些工作,比如需要指定特定的模式。下面展示如何將RDD[String]轉換成DataFrame[String]:
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
9、基於Double的操作
RDD
val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
val rddSum =doubleRDD.sum()
val rddMean = doubleRDD.mean()
Dataset
val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
val schema = StructType(Array(StructField("value",DoubleType)))
val doubleDS = sparkSession.createDataFrame(rowRDD,schema)
import org.apache.spark.sql.functions._
doubleDS.agg(sum("value"))
doubleDS.agg(mean("value"))
10、Reduce API
RDD
val rddReduce = doubleRDD.reduce((a,b) => a +b)
Dataset
val dsReduce = doubleDS.reduce((row1,row2) =>Row(row1.getDouble(0) + row2.getDouble(0)))
上面的程式碼片段展示瞭如何將你之前使用RDD API編寫的程式轉換成DataSet API編寫的程式。雖然這裡並沒有覆蓋所有的RDD API,但是通過上面的介紹,你肯定可以將其他RDD API編寫的程式轉換成DataSet API編寫的程式。
完整程式碼
package com.iteblog.spark
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* RDD API to Dataset API
* http://www.iteblog.com
*/
object RDDToDataSet {
def main(args: Array[String]) {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val sparkContext = sparkSession.sparkContext
//read data from text file
val rdd = sparkContext.textFile("src/main/resources/data.txt")
val ds = sparkSession.read.text("src/main/resources/data.txt")
// do count
println("count ")
println(rdd.count())
println(ds.count())
// wordcount
println(" wordcount ")
val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
val wordsPair = wordsRDD.map(word => (word,1))
val wordCount = wordsPair.reduceByKey(_+_)
println(wordCount.collect.toList)
import sparkSession.implicits._
val wordsDs = ds.flatMap(value => value.split("\\s+"))
val wordsPairDs = wordsDs.groupByKey(value => value)
val wordCountDs = wordsPairDs.count
wordCountDs.show()
//cache
rdd.cache()
ds.cache()
//filter
val filteredRDD = wordsRDD.filter(value => value =="hello")
println(filteredRDD.collect().toList)
val filteredDS = wordsDs.filter(value => value =="hello")
filteredDS.show()
//map partitions
val mapPartitionsRDD = rdd.mapPartitions(iterator =>
List(iterator.count(value => true)).iterator)
println(s" the count each partition is ${mapPartitionsRDD.collect().toList}")
val mapPartitionsDs = ds.mapPartitions(iterator =>
List(iterator.count(value => true)).iterator)
mapPartitionsDs.show()
//converting to each other
val dsToRDD = ds.rdd
println(dsToRDD.collect())
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
rDDToDataSet.show()
// double based operation
val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
val rddSum =doubleRDD.sum()
val rddMean = doubleRDD.mean()
println(s"sum is $rddSum")
println(s"mean is $rddMean")
val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
val schema = StructType(Array(StructField("value",DoubleType)))
val doubleDS = sparkSession.createDataFrame(rowRDD,schema)
import org.apache.spark.sql.functions._
doubleDS.agg(sum("value")).show()
doubleDS.agg(mean("value")).show()
//reduceByKey API
val reduceCountByRDD = wordsPair.reduceByKey(_+_)
val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))
println(reduceCountByRDD.collect().toList)
println(reduceCountByDs.collect().toList)
//reduce function
val rddReduce = doubleRDD.reduce((a,b) => a +b)
val dsReduce = doubleDS.reduce((row1,row2) =>
Row(row1.getDouble(0) + row2.getDouble(0)))
println("rdd reduce is " +rddReduce +" dataset reduce "+dsReduce)
}
}