第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_groupByKey
阿新 • • 發佈:2022-03-24
1. 定義
/* * 1. 定義 * def groupByKey(): RDD[(K, Iterable[V])] * def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] * def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] * * 2. 功能 * 按照相同的 Key 對 Value 進行聚合 * 只能處理 key-value型的Rdd * * 3. 思考 * 1. groupByKey 和 reduceByKey的區別? * 1. 從shuffle 的角度 * 1. groupByKey 和 reduceByKey 都存在shuffle(都要從不同分割槽節點拉取資料) * 但是 reduceByKey 可以在shuffle前對分割槽相同的key進行 預聚合(combine)功能 * 這樣會 減少落盤和傳輸的資料量 * 但是 groupByKey 只能進行分組,而不能預聚合 * 所以 reduceByKey的效能比較高 * 2. 從功能 的角度 * 1. groupByKey : 對相同的key 進行分組 * 2. reduceByKey : 對相同的key 先分組再進行聚合 * * * 4. note * 1. 傳入的分割槽器 是對 分組結果的key處理 * * **/
2. 示例
object groupByKeyTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "x1"), (-2, "x2"), (3, "x3"), (-4, "x4"), (-5, "x5"), (-6, "x6"), (7, "x7")), 2)private val rdd1 = rdd.groupByKey( new Partitioner { override def numPartitions: Int = 2 override def getPartition(key: Any): Int = if (key.asInstanceOf[Int] > 0) 1 else 0 } ) private val rdd2: RDD[(Int, Iterable[(Int, String)])] = rdd.groupBy(_._1) println(s"${rdd1.collect().mkString(",")}") println(s"${rdd2.collect().mkString(",")}") sc.stop() }