1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_groupBy運算元

第五章_Spark核心程式設計_Rdd_groupBy運算元

1. 定義

    /*
    * 1. 定義
    *     def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    *
    * 2. 功能
    *     1. 傳入一個函式,對Rdd元素進行分組判斷,函式結果為key,符合條件元素的迭代器
    *     返回一個RDD 元素型別為二元組 (key,Iterable)
    *
    * 3. note
    *     1. 將資料根據指定的規則進行分組,分割槽個數預設不變,但是資料會被打亂重新組合(這樣的過程我們稱為 Shuffle
) * *
*/
  object groupByTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd = sc.makeRDD(List(1, -2, 3, 14, 1, -10, -100), 2)

    //按 正數、負數 對Rdd進行分組
    private val rdd1: RDD[(String, Iterable[Int])] = rdd.groupBy(
      e 
=> if (e >= 0) "正數" else "負數" ) println(s"當前分割槽數 : ${rdd1.getNumPartitions}") rdd1.saveAsTextFile("Spark_319/src/output/02") println(rdd1.collect().mkString(",")) sc.stop() /* * 執行結果 * (負數,CompactBuffer(-2, -10, -100)),(正數,CompactBuffer(1, 3, 14, 1)) * *
*/ }

需求1 : 將 List("Hello", "hive", "hbase", "Hadoop")根據單詞首寫字母進行分組

  object groupByTest1 extends App {
    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2)

    var rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(
      _.substring(0, 1)
    )

    println(rdd1.collect().mkString(","))

    sc.stop()

    /*
    * 執行結果
    *     (負數,CompactBuffer(-2, -10, -100)),(正數,CompactBuffer(1, 3, 14, 1))
    *
    * */


  }

需求2 : 從伺服器日誌資料apache.log中獲取每個時間段訪問量(按小時統計)

  object groupByTest2 extends App {
    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd = sc.textFile("Spark_319/src/data/input/apache.log")

    //從日誌行中 獲取小時
    private val rdd1: RDD[String] = rdd.map(_.split(" ")(3).split(":")(1))

    //對小時分組
    private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(
      e => e
    )

    //對小時分組內求和
    private val rdd3: RDD[(String, Int)] = rdd2.map(
      tp => (tp._1, tp._2.size)
    )

    println(rdd3.collect().mkString(","))

    sc.stop()

    /*
    * 執行結果
    *     (14,498),(21,453),(06,366),(20,486),(19,493)...
    *
    * */
  }

需求3 : WordCount

  object groupByTest3 extends App {
    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest")

    val sc: SparkContext = new SparkContext(sparkconf)
    //讀取檔案
    val rdd = sc.textFile("Spark_319/src/data/*.txt")

    //拆分單詞
    private val rdd1: RDD[String] = rdd.flatMap(_.split(" "))

    //對單詞分組
    private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(e => e)

    //對分組統計
    private val rdd3: RDD[(String, Int)] = rdd2.map(tp => (tp._1, tp._2.size))

    rdd3.collect().foreach(println(_))

    sc.stop()

    /*
    * 執行結果
    *     (曹操,6)
          (關羽,3)
          (張飛,3)
          (孫權,3)
          (劉備,3)
          (政黨,3)
          (政府,3)
    *
    * */


  }