第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_cogroup
阿新 • • 發佈:2022-03-27
1. 定義
/* * 1.定義 * def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] * def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) * : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] * def cogroup[W1, W2, W3](other1: RDD[(K, W1)], * other2: RDD[(K, W2)], * other3: RDD[(K, W3)], * partitioner: Partitioner) * : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] * 2.功能 * 將兩個(或多個) 型別為(K,V)和(K,W)的RDD 進行fullouterjoin * 返回一個相同 key 對應的所有元素連線在一起的 (K,(Iterable<V>,Iterable<W>))的 RDD * * 3.操作流程 * 1. 對每個Rdd進行分組操作 * rdd1: key,Iterable<V> * rdd2: key,Iterable<W> * rdd3: key,Iterable<Z> * 2. 對多個Rdd 按Key 進行fullOuterJoin * rdd1.cogroup(rdd2,rdd3) * 結果 : key,(Iterable<V>,Iterable<W>,Iterable<Z>) * 4.note * 1. 引數中對多可以傳入三個Rdd **/
2.示例
object cogroupTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "劉備"),(1, "劉備1"), (2, "張飛"), (3, "關羽"), (4, "曹操"), (5, "趙雲"), (7, "孫權")), 2) val rdd2: RDD[(Int, String)]= sc.makeRDD(List((1, "蜀國"), (2, "蜀國"), (2, "蜀國1") ,(3, "蜀國"), (4, "魏國"), (5, "蜀國"), (6, "吳國")), 3) val rdd3: RDD[(Int, String)] = sc.makeRDD(List((1, "蜀國_"), (2, "蜀國_"), (2, "蜀國1_") ,(3, "蜀國_"), (4, "魏國_"), (5, "蜀國_"), (16, "吳國_")), 3) private val rdd4: RDD[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = rdd1.cogroup(rdd2,rdd3) rdd4.collect().foreach(println(_)) sc.stop() }