spark mlib 機器學習系列之一:Spark rdd 常見操作
阿新 • • 發佈:2019-01-25
package mlib
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object UsefulRddOpts {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("UseFulRddOpts")
.getOrCreate()
val sc = spark.sparkContext
showDiffReduceByKeyAndGroupByKey(sc)
}
/**
* 儘量使用reduceByKey,reduceByKey 會在每個partitiongs 上先進性聚合,可以減少資料
* 的傳輸,可以理解為在Mapper 的的時候進行了資料壓縮。
* @param sc spark 上下文環境
*/
def showDiffReduceByKeyAndGroupByKey(sc : SparkContext) = {
val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"),
(3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1)
val result01 = data.reduceByKey(_ + ":" + _)
val result02 = data.groupByKey()
result01.foreach(println)
//結果
//(4,nina)
//(1,susun:hh:susun)
//(3,susum)
//(2,sunsun)
result02.foreach(println)
//結果
//(4,CompactBuffer(nina))
//(1,CompactBuffer(susun, hh, susun))
//(3,CompactBuffer(susum))
//(2,CompactBuffer(sunsun))
}
/**
* reduce 用法
* @param sc hello,
*/
def showUseOfReduce(sc : SparkContext) = {
val data = sc.parallelize(Array("123", "456", "789"), 1)
val reduce01 = data.reduce((str01, str02) => str01 + str02)
val reduce02 = data.reduce(_ + _)
println(reduce01 + ":" + reduce02) // 123456789:123456789
}
/**
* keyBy 的用法,給每個資料分配一個key
* @param sc spark 上下文
*/
def showUseOfKeyBy(sc : SparkContext) = {
val data = sc.parallelize(Array("123dlalda", "hello,world", "hi, man."))
val result = data.keyBy(str => str.length)
result.foreach(println)
// 結果
//(9,123dlalda)
//(8,hi, man.)
//(11,hello,world)
}
/**
* groupBy 和groupByKey 的用法
* @param sc spark 上下文環境
*/
def showUseOfGroypByAndGroupByKey(sc : SparkContext) = {
val data = sc.parallelize(Array(1,2,3,4,5,6,7,8), 1)
val result01 = data.groupBy(x => {if(x > 3) "gt3" else "lte3"})
result01.foreach(println)
// 結果
//(gt3,CompactBuffer(4, 5, 6, 7, 8))
//(lte3,CompactBuffer(1, 2, 3))
val data02 = sc.parallelize(1 to 9)
val result02 = data02.groupBy(x => {if(x % 2 == 0) "double" else "single"})
result02.foreach(println)
// 結果
//(double,CompactBuffer(2, 4, 6, 8))
//(single,CompactBuffer(1, 3, 5, 7, 9))
val data03 = sc.parallelize(Array((1, "susun"), (2, "sunsun"),
(3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1)
val result03 = data03.groupByKey()
result03.foreach(println)
// 結果
// (4,CompactBuffer(nina))
// (1,CompactBuffer(susun, hh, susun))
// (3,CompactBuffer(susum))
// (2,CompactBuffer(sunsun))
}
/**
* 展示map 和flatMap 的用法
* @param sc
*/
def showUseOfMapAndFlatMap(sc : SparkContext) = {
val data = sc.parallelize(Array("hello,world", "bilibili,acfun", "hehe,intresting"), 1)
// 把每一行的資料進行按照逗號切分成陣列
val map = data.map(x => x.split(","))
map.foreach(x => {println(x(0) +" " + x(1))})
// 類似如此{["hello", "world"], ...,["hehe", "intresting"]}
// 把每一行的資料進行按照逗號切分,然後進行扁平化操作,
val flatMap = data.flatMap(x => x.split(","))
flatMap.foreach(println) // 結果每行一個單詞
// 例子二
val data01 = sc.parallelize(Array(1, 2, 3, 4, 5))
val result01 = data01.map(x => List(x+1)).collect // List 裡面套了Array(List(2), List(3),...,List(6))
val result02 = data01.flatMap(x => List(x+1)).collect //Array(2,3,4,5,6)
}
/**
* filter 的用法, 注意保留符合filter 條件的內容
* @param sc spark 上下文物件
*/
def showUseOfFilter(sc : SparkContext) = {
val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"),
(3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1)
val result = data.filter(item => "hh".equals(item._2))
result.foreach(println) // (1,"hh")
}
/**
* 去重操作
* @param sc spark 上下文物件
*/
def showUseOfDistinct(sc : SparkContext) = {
val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"),
(3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1)
val result = data.distinct()
result.foreach(println) // 去掉重複內容
}
/**
* countByKey 的用法
* @param sc spark 上下文物件
*/
def showUseOfCountByKey(sc : SparkContext) = {
val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"),
(3, "susum"), (4, "nina"), (1, "hh")))
val result = data.countByKey()
result.foreach(println) // 根據key 進行統計個數
}
/**
* countByValue, 統計各個Value 的個數
* @param sc spark 上下文
*/
def showUseOfCountByValue(sc : SparkContext) = {
val data = sc.parallelize(List(0, 1, 2, 3, 0, 3, 4, 5,6, 7), 1)
val result = data.countByValue()
result.foreach(println) // 根據value 進行統計個數
}
/**
* repartition 的用法, 分割槽後的rdd 需要用另一個rdd 進行裝載
* @param sc sparkContext
*/
def showUseOfRepartition(sc : SparkContext) = {
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
println(data.partitions.length)
val data01 = data.repartition(5)
println(data01.partitions.length)
val data02 = data.repartition(3)
println(data02.partitions.length) // 重新分割槽後的分割槽個數
}
/**
* def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope
* {
* coalesce(numPartitions, shuffle = true)
* }
* 重新分片,重新分割槽, 注意特例repartition
* @param sc spark 上下文物件
* @param spark 2.X 後的新的api
*/
def showUseOfCoalesce(spark : SparkSession, sc : SparkContext) = {
// val data = sc.parallelize(Seq(Map("john" -> 19, "Tracy" -> 18, "Lily" -> 20),
// Map("susum" -> 50), Map("lili" -> 50, "hehe" -> 9)), 1)
// data.aggregate(0)(math.max(_ , _), _ + _)
var data = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// var data = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1) // 如果提前制定並行度,無論後面如何操作都是一樣
data.cache()
val result01 = data.aggregate(0)(math.max(_, _), _ + _)
println(result01)
val data02 = data.coalesce(2)
val result02 = data02.aggregate(0)(math.max(_, _), _ + _)
println(result02)
val data03 = data.repartition(2)
val result03 = data03.aggregate(0)(math.max(_, _), _ + _ )
println(result03)
}
/**
* 笛卡爾積的用法
* @param sc SparkContext, spark 程式提交入口
*/
def showUseOfCartessian(sc : SparkContext) = {
// 預設會有多個分割槽
val data01 = sc.parallelize(Array(1, 2, 3, 4), 1)
val data02 = sc.parallelize(Array(5, 6), 1)
val result = data01.cartesian(data02)
result.foreach(println)
val result02 = data02.cartesian(data01)
result02.foreach(println)
}
/**
* cache 用法 (persist 的一種特例)
* @param spark SparkSession 物件,2.0 後的新的api
* @param sc spark 上下文
*/
def showUseOfCache(spark : SparkSession,sc : SparkContext) = {
import spark.implicits._
val data = sc.parallelize(List(1, 2 ,3, 4, 5, 6)).toDF()
data.cache()
val count = data.count()
println("---------------------------------------------------")
data.show()
}
/**
* 展示aggregate 函式的作用
* @param sc spark 上下文
*/
def showUseOffAggregate(sc : SparkContext) = {
// aggregate 函式,
// aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
// U 期望的返回值型別,zeroValue, 預設返回的值,分別和 seqOp 以及combOp 進行操作
// 首先對rdd 中各個分割槽中的資料進行sepOp 操作, 得到各個分割槽的結果U, 然後對各個
// 分割槽的結果U 進行combOp 操作。
// 具體看如下例子,如下中第一個data01預設應該至少有三個分割槽,所有結果是20,
// 第二個預設分割槽是1, 所以結果是8
val data01 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))
val result01 = data01.aggregate(0)(math.max(_, _), _ + _)
println(result01)//20
// data02 計算流程
// math.max(0, 1) => 1
// math.max(1, 2) => 2
// ...
// math.max(7, 8) => 8
val data02 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 1)
val result02 = data02.aggregate(0)(math.max(_, _), _ + _)
println(result02) // 8
}
}