1. 程式人生 > 其它 >Spark原理及原始碼解析【第六階段模組四】

Spark原理及原始碼解析【第六階段模組四】

簡答題:

以下程式碼:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object JoinDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel(
"WARN") val random = scala.util.Random val col1 = Range(1, 50).map(idx = (random.nextInt(10), s"user$idx")) val col2 = Array((0, "BJ"), (1, "SH"), (2, "GZ"), (3, "SZ"), (4, "TJ"), (5, "CQ"), (6, "HZ"), (7, "NJ"), (8, "WH"), (0, "CD")) val rdd1: RDD[(Int, String)] = sc.makeRDD(col1) val rdd2: RDD[(Int, String)]
= sc.makeRDD(col2) val rdd3: RDD[(Int, (String, String))] = rdd1.join(rdd2) println(rdd3.dependencies) val rdd4: RDD[(Int, (String, String))] = rdd1.partitionBy(new HashPartitioner(3)).join(rdd2.partitionBy(newHashPartitioner(3))) println(rdd4.dependencies) sc.stop() } }

問題:

兩個列印語句的結果是什麼,對應的依賴是寬依賴還是窄依賴,為什麼會是這個結果;

join 操作何時是寬依賴,何時是窄依賴;

藉助 join 相關原始碼,回答以上問題。

解答詳情

程式碼樣例

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object JoinDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    conf.set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val random = scala.util.Random
    val col1 = Range(1, 50).map(idx => (random.nextInt(10), s"user$idx"))
    val col2 = Array((0, "BJ"), (1, "SH"), (2, "GZ"), (3, "SZ"), (4, "TJ"), (5, "CQ"), (6, "HZ"), (7, "NJ"), (8, "WH"), (0, "CD"))
    val rdd1: RDD[(Int, String)] = sc.makeRDD(col1)
    val rdd2: RDD[(Int, String)] = sc.makeRDD(col2)
    val rdd3: RDD[(Int, (String, String))] = rdd1.join(rdd2)

    println("join-rdd3:====================")
    println("rdd1:====================")
    println(rdd1.partitioner)
    println(rdd1.getNumPartitions)
    rdd1.glom.collect.foreach(x=>println(x.toBuffer))
    println("rdd2:====================")
    println(rdd2.partitioner)
    println(rdd2.getNumPartitions)
    rdd2.glom.collect.foreach(x=>println(x.toBuffer))

    val rdd1p: RDD[(Int, String)] = rdd1.partitionBy(new HashPartitioner(3))
    val rdd2p: RDD[(Int, String)] = rdd2.partitionBy(new HashPartitioner(3))
    val rdd4: RDD[(Int, (String, String))] = rdd1p.join(rdd2p)

    println("join-rdd4:====================")
    println("rdd1p:====================")
    println(rdd1p.partitioner)
    println(rdd1p.getNumPartitions)
    rdd1p.glom.collect.foreach(x=>println(x.toBuffer))
    println("rdd2p:====================")
    println(rdd2p.partitioner)
    println(rdd2p.getNumPartitions)
    rdd2p.glom.collect.foreach(x=>println(x.toBuffer))


    sc.stop()
  }
}

列印結果

分析解釋:

從列印資料看到,rdd1和rdd2本身都是沒有分割槽器的,雖然預設都被分了16個分割槽,但從資料上看相同的key並沒有落到相同的分割槽裡,所有rdd1.join(rdd2)本身是需要對原始資料進行分割槽移動的,也就是rdd1,rdd2中本身分割槽中的資料可能去往rdd3的任何分割槽,這個操作是寬依賴。

但是,rdd4的產生,是rdd1和rdd2本身已經做了hash分割槽了,產生的rdd1p和rdd2p是有分割槽器,分割槽數相同,相同的key在相同分割槽。所以join的時候,rdd1p和rdd2p的資料並不會亂跑,會走向rdd4中的對應分割槽,這個操作是窄依賴。