第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_join&leftOuterJoin&rightOuterJoin&fullOuterJoin
阿新 • • 發佈:2022-03-27
1. join
/* * 1.定義 * def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] * def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] * 2.功能 * 將兩個 型別為(K,V)和(K,W)的RDD 進行join,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD * */ object joinTest extends App { val sparkconf: SparkConf= new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3)private val joinRdd: RDD[(Int, (String, String))] = rdd1.join(rdd2) joinRdd.collect().foreach(println(_)) /*(3,(關羽,蜀國)) (4,(曹操,魏國)) (1,(劉備,蜀國)) (5,(趙雲,蜀國)) (2,(張飛,蜀國)) */ sc.stop() }
2.leftOuterJoin
/* * 1.定義 * def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] * def leftOuterJoin[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (V, Option[W]))] * 2.功能 * 將兩個 型別為(K,V)和(K,W)的RDD 進行leftouterjoin,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD **/ object leftOuterJoinTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3) private val joinRdd: RDD[(Int, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2) joinRdd.collect().foreach(println(_)) /*(3,(關羽,Some(蜀國))) (4,(曹操,Some(魏國))) (1,(劉備,Some(蜀國))) (7,(孫權,None)) (5,(趙雲,Some(蜀國))) (2,(張飛,Some(蜀國))) */ sc.stop() }
3.rightOuterJoin
/* * 1.定義 * def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] * def rightOuterJoin[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (Option[V], W))] * 2.功能 * 將兩個 型別為(K,V)和(K,W)的RDD 進行leftouterjoin,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD * */ object rightOuterJoinTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3) private val joinRdd: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2) joinRdd.collect().foreach(println(_)) /*(6,(None,吳國)) (3,(Some(關羽),蜀國)) (4,(Some(曹操),魏國)) (1,(Some(劉備),蜀國)) (5,(Some(趙雲),蜀國)) (2,(Some(張飛),蜀國)) */ sc.stop() }
4.fullOuterJoin
/* * 1.定義 * def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] * def fullOuterJoin[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (Option[V], Option[W]))] * 2.功能 * 將兩個 型別為(K,V)和(K,W)的RDD 進行leftouterjoin,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD * */ object fullOuterJoinTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3) private val joinRdd = rdd1.fullOuterJoin(rdd2) joinRdd.collect().foreach(println(_)) /*(6,(None,Some(吳國))) (3,(Some(關羽),Some(蜀國))) (4,(Some(曹操),Some(魏國))) (1,(Some(劉備),Some(蜀國))) (7,(Some(孫權),None)) (5,(Some(趙雲),Some(蜀國))) (2,(Some(張飛),Some(蜀國))) */ sc.stop() }