1. 程式人生 > >spark mlib 機器學習系列之一:Spark rdd 常見操作

spark mlib 機器學習系列之一:Spark rdd 常見操作

package mlib

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

object UsefulRddOpts {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder
            .master("local[*]")
            .appName("UseFulRddOpts")
            .getOrCreate()
        val sc = spark.sparkContext
        showDiffReduceByKeyAndGroupByKey(sc)
    }

    /**
      * 儘量使用reduceByKey,reduceByKey 會在每個partitiongs 上先進性聚合,可以減少資料
      * 的傳輸,可以理解為在Mapper 的的時候進行了資料壓縮。
      * @param
sc spark 上下文環境 */
def showDiffReduceByKeyAndGroupByKey(sc : SparkContext) = { val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"), (3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1) val result01 = data.reduceByKey(_ + ":" + _) val result02 = data.groupByKey() result01.foreach(println) //結果
//(4,nina) //(1,susun:hh:susun) //(3,susum) //(2,sunsun) result02.foreach(println) //結果 //(4,CompactBuffer(nina)) //(1,CompactBuffer(susun, hh, susun)) //(3,CompactBuffer(susum)) //(2,CompactBuffer(sunsun)) } /** * reduce 用法 * @param
sc hello, */
def showUseOfReduce(sc : SparkContext) = { val data = sc.parallelize(Array("123", "456", "789"), 1) val reduce01 = data.reduce((str01, str02) => str01 + str02) val reduce02 = data.reduce(_ + _) println(reduce01 + ":" + reduce02) // 123456789:123456789 } /** * keyBy 的用法,給每個資料分配一個key * @param sc spark 上下文 */ def showUseOfKeyBy(sc : SparkContext) = { val data = sc.parallelize(Array("123dlalda", "hello,world", "hi, man.")) val result = data.keyBy(str => str.length) result.foreach(println) // 結果 //(9,123dlalda) //(8,hi, man.) //(11,hello,world) } /** * groupBy 和groupByKey 的用法 * @param sc spark 上下文環境 */ def showUseOfGroypByAndGroupByKey(sc : SparkContext) = { val data = sc.parallelize(Array(1,2,3,4,5,6,7,8), 1) val result01 = data.groupBy(x => {if(x > 3) "gt3" else "lte3"}) result01.foreach(println) // 結果 //(gt3,CompactBuffer(4, 5, 6, 7, 8)) //(lte3,CompactBuffer(1, 2, 3)) val data02 = sc.parallelize(1 to 9) val result02 = data02.groupBy(x => {if(x % 2 == 0) "double" else "single"}) result02.foreach(println) // 結果 //(double,CompactBuffer(2, 4, 6, 8)) //(single,CompactBuffer(1, 3, 5, 7, 9)) val data03 = sc.parallelize(Array((1, "susun"), (2, "sunsun"), (3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1) val result03 = data03.groupByKey() result03.foreach(println) // 結果 // (4,CompactBuffer(nina)) // (1,CompactBuffer(susun, hh, susun)) // (3,CompactBuffer(susum)) // (2,CompactBuffer(sunsun)) } /** * 展示map 和flatMap 的用法 * @param sc */ def showUseOfMapAndFlatMap(sc : SparkContext) = { val data = sc.parallelize(Array("hello,world", "bilibili,acfun", "hehe,intresting"), 1) // 把每一行的資料進行按照逗號切分成陣列 val map = data.map(x => x.split(",")) map.foreach(x => {println(x(0) +" " + x(1))}) // 類似如此{["hello", "world"], ...,["hehe", "intresting"]} // 把每一行的資料進行按照逗號切分,然後進行扁平化操作, val flatMap = data.flatMap(x => x.split(",")) flatMap.foreach(println) // 結果每行一個單詞 // 例子二 val data01 = sc.parallelize(Array(1, 2, 3, 4, 5)) val result01 = data01.map(x => List(x+1)).collect // List 裡面套了Array(List(2), List(3),...,List(6)) val result02 = data01.flatMap(x => List(x+1)).collect //Array(2,3,4,5,6) } /** * filter 的用法, 注意保留符合filter 條件的內容 * @param sc spark 上下文物件 */ def showUseOfFilter(sc : SparkContext) = { val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"), (3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1) val result = data.filter(item => "hh".equals(item._2)) result.foreach(println) // (1,"hh") } /** * 去重操作 * @param sc spark 上下文物件 */ def showUseOfDistinct(sc : SparkContext) = { val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"), (3, "susum"), (4, "nina"), (1, "hh"), (1, "susun")), 1) val result = data.distinct() result.foreach(println) // 去掉重複內容 } /** * countByKey 的用法 * @param sc spark 上下文物件 */ def showUseOfCountByKey(sc : SparkContext) = { val data = sc.parallelize(Array((1, "susun"), (2, "sunsun"), (3, "susum"), (4, "nina"), (1, "hh"))) val result = data.countByKey() result.foreach(println) // 根據key 進行統計個數 } /** * countByValue, 統計各個Value 的個數 * @param sc spark 上下文 */ def showUseOfCountByValue(sc : SparkContext) = { val data = sc.parallelize(List(0, 1, 2, 3, 0, 3, 4, 5,6, 7), 1) val result = data.countByValue() result.foreach(println) // 根據value 進行統計個數 } /** * repartition 的用法, 分割槽後的rdd 需要用另一個rdd 進行裝載 * @param sc sparkContext */ def showUseOfRepartition(sc : SparkContext) = { val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6)) println(data.partitions.length) val data01 = data.repartition(5) println(data01.partitions.length) val data02 = data.repartition(3) println(data02.partitions.length) // 重新分割槽後的分割槽個數 } /** * def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope * { * coalesce(numPartitions, shuffle = true) * } * 重新分片,重新分割槽, 注意特例repartition * @param sc spark 上下文物件 * @param spark 2.X 後的新的api */ def showUseOfCoalesce(spark : SparkSession, sc : SparkContext) = { // val data = sc.parallelize(Seq(Map("john" -> 19, "Tracy" -> 18, "Lily" -> 20), // Map("susum" -> 50), Map("lili" -> 50, "hehe" -> 9)), 1) // data.aggregate(0)(math.max(_ , _), _ + _) var data = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // var data = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1) // 如果提前制定並行度,無論後面如何操作都是一樣 data.cache() val result01 = data.aggregate(0)(math.max(_, _), _ + _) println(result01) val data02 = data.coalesce(2) val result02 = data02.aggregate(0)(math.max(_, _), _ + _) println(result02) val data03 = data.repartition(2) val result03 = data03.aggregate(0)(math.max(_, _), _ + _ ) println(result03) } /** * 笛卡爾積的用法 * @param sc SparkContext, spark 程式提交入口 */ def showUseOfCartessian(sc : SparkContext) = { // 預設會有多個分割槽 val data01 = sc.parallelize(Array(1, 2, 3, 4), 1) val data02 = sc.parallelize(Array(5, 6), 1) val result = data01.cartesian(data02) result.foreach(println) val result02 = data02.cartesian(data01) result02.foreach(println) } /** * cache 用法 (persist 的一種特例) * @param spark SparkSession 物件,2.0 後的新的api * @param sc spark 上下文 */ def showUseOfCache(spark : SparkSession,sc : SparkContext) = { import spark.implicits._ val data = sc.parallelize(List(1, 2 ,3, 4, 5, 6)).toDF() data.cache() val count = data.count() println("---------------------------------------------------") data.show() } /** * 展示aggregate 函式的作用 * @param sc spark 上下文 */ def showUseOffAggregate(sc : SparkContext) = { // aggregate 函式, // aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U // U 期望的返回值型別,zeroValue, 預設返回的值,分別和 seqOp 以及combOp 進行操作 // 首先對rdd 中各個分割槽中的資料進行sepOp 操作, 得到各個分割槽的結果U, 然後對各個 // 分割槽的結果U 進行combOp 操作。 // 具體看如下例子,如下中第一個data01預設應該至少有三個分割槽,所有結果是20, // 第二個預設分割槽是1, 所以結果是8 val data01 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8)) val result01 = data01.aggregate(0)(math.max(_, _), _ + _) println(result01)//20 // data02 計算流程 // math.max(0, 1) => 1 // math.max(1, 2) => 2 // ... // math.max(7, 8) => 8 val data02 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 1) val result02 = data02.aggregate(0)(math.max(_, _), _ + _) println(result02) // 8 } }