1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_join&leftOuterJoin&rightOuterJoin&fullOuterJoin

第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_join&leftOuterJoin&rightOuterJoin&fullOuterJoin

1. join

  /*
  * 1.定義
  *     def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  *     def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  * 2.功能
  *     將兩個 型別為(K,V)和(K,W)的RDD 進行join,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD
  * */
  object joinTest extends App {

    val sparkconf: SparkConf 
= new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3)
private val joinRdd: RDD[(Int, (String, String))] = rdd1.join(rdd2) joinRdd.collect().foreach(println(_)) /*(3,(關羽,蜀國)) (4,(曹操,魏國)) (1,(劉備,蜀國)) (5,(趙雲,蜀國)) (2,(張飛,蜀國)) */ sc.stop() }

2.leftOuterJoin

  /*
  * 1.定義
  *     def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  *     def leftOuterJoin[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (V, Option[W]))]
  * 2.功能
  *     將兩個 型別為(K,V)和(K,W)的RDD 進行leftouterjoin,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD
  * 
*/ object leftOuterJoinTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3) private val joinRdd: RDD[(Int, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2) joinRdd.collect().foreach(println(_)) /*(3,(關羽,Some(蜀國))) (4,(曹操,Some(魏國))) (1,(劉備,Some(蜀國))) (7,(孫權,None)) (5,(趙雲,Some(蜀國))) (2,(張飛,Some(蜀國))) */ sc.stop() }

3.rightOuterJoin

  /*
  * 1.定義
  *     def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  *     def rightOuterJoin[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (Option[V], W))]
  * 2.功能
  *     將兩個 型別為(K,V)和(K,W)的RDD 進行leftouterjoin,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD
  * */
  object rightOuterJoinTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2)
    val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3)

    private val joinRdd: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)

    joinRdd.collect().foreach(println(_))

    /*(6,(None,吳國))
      (3,(Some(關羽),蜀國))
      (4,(Some(曹操),魏國))
      (1,(Some(劉備),蜀國))
      (5,(Some(趙雲),蜀國))
      (2,(Some(張飛),蜀國))
    */

    sc.stop()
  }

4.fullOuterJoin

  /*
    * 1.定義
    *     def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
    *     def fullOuterJoin[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
    * 2.功能
    *     將兩個 型別為(K,V)和(K,W)的RDD 進行leftouterjoin,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD
    * */
  object fullOuterJoinTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2)
    val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3)

    private val joinRdd = rdd1.fullOuterJoin(rdd2)

    joinRdd.collect().foreach(println(_))

    /*(6,(None,Some(吳國)))
      (3,(Some(關羽),Some(蜀國)))
      (4,(Some(曹操),Some(魏國)))
      (1,(Some(劉備),Some(蜀國)))
      (7,(Some(孫權),None))
      (5,(Some(趙雲),Some(蜀國)))
      (2,(Some(張飛),Some(蜀國)))
    */

    sc.stop()
  }