1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd運算元_2value_求交集&並集&差集&拉鍊_intersection&union&subtract&zip

第五章_Spark核心程式設計_Rdd運算元_2value_求交集&並集&差集&拉鍊_intersection&union&subtract&zip

1. 求交集-intersection

  object intersectionTest extends App {
    /*
    * 1. 定義
    *    def intersection(other: RDD[T]): RDD[T]
    *
    * 2. 功能
    *    對源 RDD 和引數 RDD 求交集後返回一個新的 RDD
    *    參與運算的兩個Rdd 型別必須一致
    *
    * 3. note
    *    1. 內部存在 shuffle 過程
    * */

    val sparkconf: SparkConf = new
SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd = sc.makeRDD(List(1, 2, 3, 8), 2) val rdd1 = sc.makeRDD(List(5, 6, 7, 8), 2) val rdd2 = sc.makeRDD(List("x", "y", "z", "m"), 2) private val rdd3: RDD[Int] = rdd.intersection(rdd1)
//rdd.intersection(rdd2) println(rdd3.collect().mkString(",")) sc.stop() }

2. 求並集-union

  object unionTest extends App {
    /*
    * 1. 定義
    *    def union(other: RDD[T]): RDD[T]
    *
    * 2. 功能
    *    對源 RDD 和 引數RDD 求並集後 返回一個新的RDD
    *    參與運算的兩個Rdd 型別必須一致
    *
    * 3. note
    *    1. 兩個Rdd 相同的元素,不會去重
    * 
*/ val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd = sc.makeRDD(List(1, 2, 3, 8), 2) val rdd1 = sc.makeRDD(List(5, 6, 7, 8), 2) val rdd2 = sc.makeRDD(List("x", "y", "z", "m"), 2) private val rdd3: RDD[Int] = rdd.union(rdd1) //rdd.intersection(rdd2) println(rdd3.collect().mkString(",")) sc.stop() }

3. 求差集-subtract

  object subtractTest extends App {
    /*
    * 1. 定義
    *    def subtract(other: RDD[T]): RDD[T]
    *
    * 2. 功能
    *    以一個 RDD 元素為主,去除兩個 RDD 中重複元素,將其他元素保留下來
    *    參與運算的兩個Rdd 型別必須一致
    *
    * */

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd = sc.makeRDD(List(1, 2, 3, 8), 2)
    val rdd1 = sc.makeRDD(List(5, 6, 7, 8), 2)
    val rdd2 = sc.makeRDD(List("x", "y", "z", "m"), 2)

    private val rdd3: RDD[Int] = rdd.subtract(rdd1)
    //rdd.subtract(rdd2)
    println(rdd3.collect().mkString(","))

    sc.stop()
  }

4. 求拉鍊-zip

  object zipTest extends App {
    /*
    * 1. 定義
    *    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    *
    * 2. 功能
    *    將兩個 RDD 中的元素,以鍵值對的形式進行合併
    *    其中,鍵值對中的 Key 為第1個RDD中的元素
    *               Value 為第2個RDD中的相同位置的元素
    *
    * 3. 思考
    *    1. 如果兩個 RDD 資料型別不一致怎麼辦?
    *           拉鍊操作兩個資料來源的型別可以不一致
    *
    *    2. 如果兩個 RDD 資料分割槽不一致怎麼辦?
    *           Can't zip RDDs with unequal numbers of partitions: List(2, 3)
    *           分割槽個數必須一致
    *
    *    3. 如果兩個 RDD 分割槽資料數量不一致怎麼辦?
    *           Can only zip RDDs with same number of elements in each partition
    *           分數個數相同時,分割槽內資料量也必須相同
    *
    * */

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd = sc.makeRDD(List(1, 2, 3, 8), 2)
    val rdd1 = sc.makeRDD(List(5, 6, 7, 8), 2)
    val rdd11 = sc.makeRDD(List(5, 6, 7, 8, 9, 10), 2)
    val rdd2 = sc.makeRDD(List("x", "y", "z", "m"), 2)
    val rdd22 = sc.makeRDD(List("x", "y", "z", "m"), 3)

    private val rdd3: RDD[(Int, Int)] = rdd.zip(rdd1)
    private val rdd4: RDD[(Int, String)] = rdd.zip(rdd2) //資料型別不一致
    //private val rdd5: RDD[(Int, String)] = rdd.zip(rdd22) //分割槽數量不一致
    private val rdd6 = rdd.zip(rdd11) //分割槽數量一致,分割槽內資料量不一致

    println(rdd3.collect().mkString(","))
    println(rdd4.collect().mkString(","))
    //println(rdd5.collect().mkString(","))
    println(rdd6.collect().mkString(","))

    sc.stop()
  }