第五章_Spark核心程式設計_Rdd_groupBy運算元
阿新 • • 發佈:2022-03-23
1. 定義
/* * 1. 定義 * def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] * * 2. 功能 * 1. 傳入一個函式,對Rdd元素進行分組判斷,函式結果為key,符合條件元素的迭代器 * 返回一個RDD 元素型別為二元組 (key,Iterable) * * 3. note * 1. 將資料根據指定的規則進行分組,分割槽個數預設不變,但是資料會被打亂重新組合(這樣的過程我們稱為 Shuffle) * * */
object groupByTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd = sc.makeRDD(List(1, -2, 3, 14, 1, -10, -100), 2) //按 正數、負數 對Rdd進行分組 private val rdd1: RDD[(String, Iterable[Int])] = rdd.groupBy( e=> if (e >= 0) "正數" else "負數" ) println(s"當前分割槽數 : ${rdd1.getNumPartitions}") rdd1.saveAsTextFile("Spark_319/src/output/02") println(rdd1.collect().mkString(",")) sc.stop() /* * 執行結果 * (負數,CompactBuffer(-2, -10, -100)),(正數,CompactBuffer(1, 3, 14, 1)) * **/ }
需求1 : 將 List("Hello", "hive", "hbase", "Hadoop")根據單詞首寫字母進行分組
object groupByTest1 extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2) var rdd1: RDD[(String, Iterable[String])] = rdd.groupBy( _.substring(0, 1) ) println(rdd1.collect().mkString(",")) sc.stop() /* * 執行結果 * (負數,CompactBuffer(-2, -10, -100)),(正數,CompactBuffer(1, 3, 14, 1)) * * */ }
需求2 : 從伺服器日誌資料apache.log中獲取每個時間段訪問量(按小時統計)
object groupByTest2 extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd = sc.textFile("Spark_319/src/data/input/apache.log") //從日誌行中 獲取小時 private val rdd1: RDD[String] = rdd.map(_.split(" ")(3).split(":")(1)) //對小時分組 private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy( e => e ) //對小時分組內求和 private val rdd3: RDD[(String, Int)] = rdd2.map( tp => (tp._1, tp._2.size) ) println(rdd3.collect().mkString(",")) sc.stop() /* * 執行結果 * (14,498),(21,453),(06,366),(20,486),(19,493)... * * */ }
需求3 : WordCount
object groupByTest3 extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest") val sc: SparkContext = new SparkContext(sparkconf) //讀取檔案 val rdd = sc.textFile("Spark_319/src/data/*.txt") //拆分單詞 private val rdd1: RDD[String] = rdd.flatMap(_.split(" ")) //對單詞分組 private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(e => e) //對分組統計 private val rdd3: RDD[(String, Int)] = rdd2.map(tp => (tp._1, tp._2.size)) rdd3.collect().foreach(println(_)) sc.stop() /* * 執行結果 * (曹操,6) (關羽,3) (張飛,3) (孫權,3) (劉備,3) (政黨,3) (政府,3) * * */ }