SparkCore運算元(例項)之---- 交集、差集、並集(intersection, subtract, union, distinct, subtractByKey)
阿新 • • 發佈:2019-02-17
1. 交集 intersecion
1.1 原始碼
/** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did.//交集結果將會去重 * * @note This method performs a shuffle internally.//屬於shuffle類運算元 */ //參與計算的兩個RDD的元素泛型必須一致,也是返回的RDD的元素泛型 def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys }
原始碼分析:
- thisRDD.intersection(otherRDD):計算 thisRDD 和 otherRDD 的交集,交集結果將不會包含重複的元素,即使有的元素在兩個 RDD 中都出現多次;
- intersection 屬於 shuffleDependency 類運算元;
- 其內部呼叫了cogroup運算元;
- Note:凡是涉及兩個RDD的計算,並且計算是以相同 key分組的資料為物件進行的,那麼一定會呼叫 cogroup(otherDataSet,[numTasks]) 運算元。
1.2 程式碼例項:
val list1 = List(1,2,3,4,5,6,7,7,20) val list2 = List(4,5,6,7,8,9,10) val rdd1: RDD[Int] = sc.parallelize(list1 , 3) //3為分割槽數,預設分割槽數為2 val rdd2: RDD[Int] = sc.parallelize(list2) //交集:rdd1交rdd2 rdd1.intersection(rdd2).foreach(println)
執行結果如下:
6
4
7
5
2. 差集 subtract
2.1 原始碼
/**//預設保持thisRDD的分割槽器 和 分割槽數量 * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ def subtract(other: RDD[T]): RDD[T] = withScope { subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length))) } /**//可以傳入引數,控制新生成RDD的分割槽數量(仍保持thisRDD分割槽規則) * Return an RDD with the elements from `this` that are not in `other`. */ def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope { subtract(other, new HashPartitioner(numPartitions)) } /**//可以傳入引數,控制使用自定義的分割槽器 * Return an RDD with the elements from `this` that are not in `other`. */ def subtract( other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples val p2 = new Partitioner() { override def numPartitions: Int = p.numPartitions override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1) } // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies // anyway, and when calling .keys, will not have a partitioner set, even though // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be // partitioned by the right/real keys (e.g. p). this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys } else { this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys } }
2.2 程式碼例項
2.2.1 參與運算的RDD的泛型必須完全一致(統一型別)
//準備資料集
val list1 = List(1,2,3,4,5,6,7,7,20)
val list2 = List(4,5,6,7,8,9,10)
val array = Array("hello huangbo","hello xuzheng","hello huangxiaoming")
val kv = Array(("a",1), ("b",2),("c",3),("a",1),("b",1),("c",1))
val rdd1: RDD[Int] = sc.parallelize(list1 , 3)
val rdd2: RDD[Int] = sc.parallelize(list2)
val rdd3: RDD[String] = sc.makeRDD(array)
//k-v型的PairRDD
val rdd4:RDD[(String,Int)] = sc.makeRDD(kv) //會自動將元組的第一個元素作為key
/** 開始計算差集
* subtract():差集,參與運算的RDD必須具有相同泛型(元素型別一致);
* 1、當為單值元素時,直接求差集
* 2、當為(K,V)時,仍然按照整個元素進行求差集(而不是按照key);
*/
val subtractRes: RDD[Int] = rdd1.subtract(rdd2)
subtractRes.foreach(x => print(x + "\t"));println() //差集: 3 1 2 20
//rdd3.subtract(rdd4) //錯誤,參與運算的RDD必須泛型相同
2.2.2 當RDD的元素為元組時,元組內部的構成元素也必須一致:
//錯誤:泛型不統一,無法進行差集計算(上雖然都是元組,但是元組的泛型不一致)
val list01 = Array(("a",1), ("b",2), ("c",3))
val rdd01: RDD[(String, Int)] = sc.parallelize(list01)
val list02 = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
val rdd02: RDD[(String, Any)] = sc.makeRDD(list02)
//rdd01.subtract(rdd02).foreach(print) //錯誤,元組的泛型不一致
但是可以使用多型,向上進行型別抽象,將型別統一:
//正確:泛型統一了,結果為:(a,1)(b,2)
//手動指定泛型Any,以統一型別
val list03: Array[(String, Any)] = Array(("a",1), ("b",2), ("c",3))
val rdd03 = sc.parallelize(list03)
val list04: Array[(String, Any)] = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
val rdd04 = sc.makeRDD(list04)
rdd03.subtract(rdd04).foreach(print)
3. 按照key取差集 subtractByKey
thisPairRDD.subtractByKey(otherPairRDD):以key值作為元素的唯一性標誌,記性差集運算,與value的型別和值無關。
注意:參與運算的必須是PairRDD。
程式碼例項
/**
* subtractByKey(otherRDD):只針對於key做差集,返回主RDD中存在的KEY,而otherRDD中不存在的KEY的元素;
* ----針對於PairRDD
*/
val rdd10 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
val rdd11 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3)))
//結果為 (d,5): 因為只有key="d" 在rdd11中沒有出現
rdd10.subtractByKey(rdd11).foreach(print)
4. 並集
4.1 拼接運算元 union
/** 交集、並集、差集
* union(): 直接拼接,並不會去重(並不是數學意義上的並集)
* count():統計 RDD的元素個數!
*/
/*
rdd1 = {1,2,3,4,5,6,7,7,20}
rdd2 = {4,5,6,7,8,9,10}
*/
println(rdd1.union(rdd2).count())//16個元素
4.2 求交集(先union,再distinct)
//並集:先union拼接,再distinct去重
rdd1.union(rdd2).distinct().foreach(println)