1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_groupByKey

第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_groupByKey

1. 定義

  /*
  * 1. 定義
  *     def groupByKey(): RDD[(K, Iterable[V])]
  *     def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  *     def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  *
  * 2. 功能
  *     按照相同的 Key 對 Value 進行聚合
  *     只能處理 key-value型的Rdd
  *
  * 3. 思考
  *    1. groupByKey 和 reduceByKey的區別?
  *       1. 從shuffle 的角度
  *           1. groupByKey 和 reduceByKey 都存在shuffle(都要從不同分割槽節點拉取資料)
  *              但是 reduceByKey 可以在shuffle前對分割槽相同的key進行 預聚合(combine)功能
  *                  這樣會 減少落盤和傳輸的資料量
  *              但是 groupByKey 只能進行分組,而不能預聚合
  *              所以 reduceByKey的效能比較高
  *       2. 從功能 的角度
  *           1. groupByKey : 對相同的key 進行分組
  *           2. reduceByKey : 對相同的key 先分組再進行聚合
  *
  *
  * 4. note
  *   1. 傳入的分割槽器 是對 分組結果的key處理
  *
  *
  * 
*/

2. 示例

  object groupByKeyTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "x1"), (-2, "x2"), (3, "x3"), (-4, "x4"), (-5, "x5"), (-6, "x6"), (7, "x7")), 2)

    
private val rdd1 = rdd.groupByKey( new Partitioner { override def numPartitions: Int = 2 override def getPartition(key: Any): Int = if (key.asInstanceOf[Int] > 0) 1 else 0 } ) private val rdd2: RDD[(Int, Iterable[(Int, String)])] = rdd.groupBy(_._1) println(s
"${rdd1.collect().mkString(",")}") println(s"${rdd2.collect().mkString(",")}") sc.stop() }