1. 程式人生 > 實用技巧 >spark原理解析和spark core

spark原理解析和spark core

spark原理解析

  • 解析一:resilient distributed dataset(RDD)

resilient distributed dataset(RDD):彈性分散式資料集,有容錯機制可並行執行。

分散式即體現在每個rdd分多個partition,每個partition在執行時為一個task,會被master分配到某一個worker執行器(Executor)的某一個core中。

彈性體現在流水線思想(pipe),即rdd方法分為transformations和actions方法,兩者的官方解釋為:RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset。transformations類方法在執行過程中,只會記錄每個rdd的依賴,不會立即執行,在這個過程中,可以彈性的處理partition。當action類方法執行時,會按照依賴,每個rdd去父rdd中要資料。

  • 解析二:窄依賴(完全依賴)和寬依賴(部分依賴)

transformations類方法的依賴分為窄依賴(完全依賴)和寬依賴(部分依賴),窄依賴可以理解為:每個父rdd的一個分割槽的資料只會給一個子rdd的一個分割槽(一個task的資料只會給流水線下游的一個task),子rdd的分割槽中資料來自一個多個父rdd的分割槽的資料;寬依賴肯定會有某些或全部父rdd的task資料給多個子rdd的task。

當寬依賴時,需要進行shuffle,此時,會按照shuffle切分成一個個stage。

整個job的過程是一個有向無環圖(DAG),如下圖,是rdd方法leftOuterJoin執行時的一個DAG,rdd leftOuterJoin是寬依賴,因此要劃分stage,並會發生shuffle;當觸發action類方法如collect時會按照依賴往dirver拉資料時,會從rdd leftOuterJoin的task中拿資料,從而自下而上,觸發整個流水線作業。

Dependency
	NarrowDependency(窄依賴)
		OneToOneDependency
		RangeDependency
	ShuffleDependency(寬依賴)

  • 解析三:shuffle
ShuffleManager
	HashShuffleManager
	SortShuffleManager(預設)

目前預設的shuffle方式為:SortShuffleManager

由於一個worker上執行多個task,每個worker上生成的所有臨時檔案數是reduce的數量

具體reduceByKey的shuffle過程如下,在map端會進行shuffle寫,會先寫到快取,然後寫到磁碟;在reduce端會進行shuffle讀,讀取時會判斷取遠端讀還是在本機讀,讀取時也會先寫到快取。

shuffle時,等map端的父stage寫完後,reduce端才會去進行fetch,fetch的時候是邊fetch邊處理,不會等全部fetch完再處理。

另外一種方式,hashShuffle,每個worker上會生成map*reduce個磁碟檔案,會增大磁碟io以及記憶體的壓力。

shuffle涉及的設定如下:

1、shuffle方式(sort、hash)
spark.shuffle.manager
2、spark.shuffle.file.buffer.kb
shuffle寫入快取的大小(預設32kb)
3、spark.reducer.maxMbInFlight
shuffle讀(reduce端)快取大小(預設48m)
4、spark.shuffle.compress
shuffle寫入磁碟是否壓縮,預設true
5、spark.shuffle.io.maxRetries
shuffle讀通過netty fetches讀時,失敗的最大嘗試次數,預設3
6、spark.shuffle.io.retryWait
5中每次等待幾秒(預設5s)
7、spark.shuffle.service.index.cache.size

  • 解析四:task數量

當transformation方法時,就確定了map和reduce的task數量。

一般一個worker啟動一個Executor,預設每個Executor使用的core數(同一時間一個core只能執行一個task)為機器的所有核心數(即每個CPU的核數相加)

使用rdd方法建立一個rdd時,如果執行在cluster模式下,partition預設的數量為所有Executor的總core數。

reduce的partition的數量。由於reduce可能來自多個rdd,如果沒有自己實現分割槽器(partition)時,使用的是預設的分割槽器,此時如果配置檔案沒有配置引數時,使用的是父rdd的最大分割槽數,原始碼如下:

     *reduce分割槽的數量
     * 先按rdd1、rdd2的分割槽數進行降序排列,此時按續遍歷,如果發現有rdd自己定時了partitioner,就返回自己定義的;如果沒有定義,去查詢spark.default.parallelism,如果沒有該配置,返回父rdd分割槽數最高的一個分割槽;上面rdd3的分割槽為3個,取最高的
     * 預設是HashPartitioner 還有一個RangePartitioner
   def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }

解析五:打包釋出,使用資源管理yarn

程式如下:

object TestSparkCore {
  def main(args: Array[String]): Unit = {
    // 1. 建立spark context
    val conf = new SparkConf()
    conf.setAppName("first") // 設定應用程式的名字 
//         conf.setMaster("spark://focuson1:7077")  // 等價於 --master 引數
    conf.set("spark.shuffle.manager", "hash") // 修改shuffle的實現類
    val sc = new SparkContext(conf)
    
    test_reduceByKey(sc)

    sc.stop()

  }

打成jar包,傳到focuson1上,執行下面語句。

spark-submit --master yarn-cluster --class com.bd.spark.core.TestSparkCore my_first_app.jar

--master  yarn-cluster 使用yarn或spark://ip:7077
--class 執行的類的,有包要寫上包
--conf 配置 如--conf spark.shuffle.manager=hash
--driver-class-path jar包路徑,不會發布到全部worker
--jars jar包路徑,會發布到全部worker
--application-arguments 傳遞給主方法的引數

*像conf、appname等程式的優先順序大於spark-submit

spark core

  1. core之rdd方法
  • rdd action
  1. /*
  2. * actions方法
  3. //collect,從每個worker中拿資料,在driver中顯示。driver可能會oom
  4. //takeordered是升序從每個分割槽中(一個rdd有多個分割槽,每個分割槽是一個task)拿出i個數據,拿到driver進行比較,拿出i個數據
  5. //top是降序,類似takeordered
  6. */
  7. def test_reduce(sc: SparkContext) = {
  8. val rdd = sc.makeRDD(List("hello world", "hello count", "world spark"))
  9. rdd.reduce((x, y) => (x + y))
  10. }
  11. def test_countApprox(sc: SparkContext) = {
  12. //在資料量特別大,不需要精確結果時,求一個近似值
  13. val rdd3 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
  14. rdd3.countApprox(1000l)
  15. }
  16. def test_saveAsTextFile(sc: SparkContext) = {
  17. val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
  18. rdd.saveAsTextFile("hdfs://focuson1:9000/spark")
  19. }
  • rdd transformation
  1. def test_flatMap(sc: SparkContext) = {
  2. val rdd = sc.makeRDD(List("hello world", "hello count", "world spark"), 2)
  3. val rdd2 = rdd.flatMap { x => x.split(" ") }
  4. println(rdd2.collect())
  5. //res1: Array[String] = Array(hello, world, hello, count, world, spark)
  6. }
  7. def test_union(sc: SparkContext) = {
  8. //執行時,在一個stage內,分為三個rdd 求並積 分割槽是rdd1+rdd2
  9. val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
  10. val rdd2 = sc.parallelize(List(5, 6, 7, 8, 9, 10))
  11. val rdd3 = rdd1 ++ rdd2
  12. //res54: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
  13. }
  14. def test_cartesian(sc: SparkContext) {
  15. //分割槽數為rdd1*rdd2 笛卡爾積
  16. val rdd1 = sc.parallelize(List("dog", "cat", "tiger"))
  17. val rdd2 = sc.parallelize(List(1, 2))
  18. rdd1.cartesian(rdd2)
  19. //res5: Array[(String, Int)] = Array((dog,1), (dog,2), (cat,1), (cat,2), (tiger,1), (tiger,2))
  20. }
  21. //mapPartitionsWithIndex
  22. def test_mapPartitionsWithIndex(sc: SparkContext) = {
  23. val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
  24. var list = List[String]()
  25. // 分割槽0的所有資料+a , 分割槽1的資料 +b
  26. rdd.mapPartitionsWithIndex((i, iter) => {
  27. while (iter.hasNext) {
  28. if (i == 0)
  29. list = list :+ (iter.next + "a")
  30. else {
  31. list = list :+ (iter.next + "b")
  32. }
  33. }
  34. list.iterator
  35. })
  36. }
  37. def test_zip(sc: SparkContext) = {
  38. //兩個rdd元素必須相等
  39. val list = sc.makeRDD(List(43, 5, 2, 5, 6, 33))
  40. val list2 = sc.makeRDD(List("a", "b", "c", "d", "e", "f"))
  41. list.zip(list2).collect
  42. //res29: Array[(Int, String)] = Array((43,a), (5,b), (2,c), (5,d), (6,e), (33,f))
  43. }
  44. def test_reparition(sc: SparkContext) = {
  45. val rdd3 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
  46. rdd3.coalesce(4) //預設是false,即分割槽由多變少,此時由2變為4不能成功,還是兩個分割槽
  47. rdd3.coalesce(4, true) //此時會成功
  48. /**
  49. * def coalesce(numPartitions: Int, shuffle: Boolean = false)
  50. * 預設是false,即分割槽由多變少,有多變少不會進行shuffle;true時會進行分割槽,此時會進行shuffle
  51. */
  52. /*
  53. def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  54. coalesce(numPartitions, shuffle = true)
  55. }
  56. */
  57. rdd3.repartition(4) //相當於 rdd3.coalesce(4, true)
  58. }
  59. def test_reduceByKey(sc: SparkContext) = {
  60. //reduceByKey就是把key值進行分組,然後每組內進行reduce
  61. val rdd = sc.makeRDD(List(("hello", 1), ("hello", 1), ("hello", 1), ("world", 1), ("world", 1)))
  62. val rdd2 = rdd.reduceByKey { (x, y) => x + y }
  63. //res2: Array[(String, Int)] = Array((hello,3), (world,2))
  64. }
  65. def test_intersection(sc: SparkContext) = {
  66. //取兩個rdd的交集
  67. val rdd1 = sc.parallelize(List("dog", "cat", "tiger"), 2)
  68. val rdd2 = sc.parallelize(List("dog", "wolf", "pig"), 3)
  69. val rdd3 = rdd1.intersection(rdd2)
  70. //res23: Array[String] = Array(dog)
  71. }
  72. def test_sortBy(sc: SparkContext) = {
  73. val list = sc.makeRDD(List(43, 5, 2, 5, 6, 33))
  74. list.sortBy(x => x) //升序
  75. list.sortBy(x => x, false) //降序
  76. }
  77. def test_aggregateByKey(sc: SparkContext) = {
  78. import scala.math._
  79. val rdd = sc.parallelize(List(("pig", 3), ("cat", 2), ("dog", 5), ("cat", 4), ("dog", 3), ("cat", 3), ("cat", 7), ("cat", 4)), 2)
  80. rdd.aggregateByKey(0)((x, y) => x + y, (x, y) => x * y)
  81. /* partition:[0]
  82. (pig,3)
  83. (cat,2)
  84. (dog,5)
  85. (cat,4)
  86. partition:[1]
  87. (dog,3)
  88. (cat,3)
  89. (cat,7)
  90. (cat,4)*/
  91. //同一個分割槽內根據key進行分組,然後每組的value值進行第一個表示式的reduce操作
  92. /* partition:[0]
  93. (pig,3)
  94. (cat,6)
  95. (dog,5)
  96. partition:[1]
  97. (dog,3)
  98. (cat,14)
  99. (cat,7)
  100. */
  101. //然後對各個分割槽的所有資料按key進行分割槽,然後按對value值進行reduce
  102. //res38: Array[(String, Int)] = Array((dog,15), (pig,3), (cat,84))
  103. //引數0(zeroValue)是指參與第一個表示式的運算,即每個分割槽內按分割槽之後每個組都有一個zeroValue值。如果rdd.aggregateByKey(100)((x,y)=>x+y, (x,y)=>x*y)
  104. rdd.aggregateByKey(100)((x, y) => x + y, (x, y) => x * y)
  105. /* partition:[0]
  106. (pig,3)
  107. (cat,2)
  108. (dog,5)
  109. (cat,4)
  110. partition:[1]
  111. (dog,3)
  112. (cat,3)
  113. (cat,7)
  114. (cat,4)*/
  115. //同一個分割槽內根據key進行分組,然後每組的value值進行第一個表示式的reduce操作
  116. /* partition:[0]
  117. (pig,103)
  118. (cat,106)
  119. (dog,105)
  120. partition:[1]
  121. (dog,103)
  122. (cat,114)
  123. (cat,107)
  124. */
  125. //然後對各個分割槽的所有資料按key進行分割槽,然後按對value值進行reduce
  126. //res40: Array[(String, Int)] = Array((dog,10815), (pig,103), (cat,12084))
  127. }
  128. def test_cogroup(sc: SparkContext) = {
  129. val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3)))
  130. val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2)))
  131. rdd1.cogroup(rdd2)
  132. //res49: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((dog,(CompactBuffer(1),CompactBuffer(2))), (cat,(CompactBuffer(1, 3),CompactBuffer(2))))
  133. }
  134. def test_combineByKey(sc: SparkContext) = {
  135. }
  136. def test_groupBykey(sc: SparkContext) = {
  137. val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3), ("dog", 2)))
  138. rdd1.groupByKey()
  139. //res46: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(1, 2)), (cat,CompactBuffer(1, 3)))
  140. //Groupbykey會將所有的資料發給reducer,reducer壓力會比較大,另外會比較佔用網路頻寬, 相比之下,reduceByKey, 會在mapper端首先進行運算,reducer的壓力小,另外也可以節省網路頻寬
  141. }
  142. def test_join(sc: SparkContext) = {
  143. val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3)))
  144. val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2), ("tiger", 2)))
  145. rdd1.join(rdd2)
  146. //Array((dog,(1,2)), (cat,(1,2)), (cat,(3,2))) 將兩個rdd集合中key相同的元素連線在一起 沒有tiger
  147. }
  148. def test_leftOuterJoin(sc: SparkContext) = {
  149. val rdd1 = sc.parallelize(List(("cat", 1), ("dog", 1), ("cat", 3), ("wolf", 1)))
  150. val rdd2 = sc.parallelize(List(("cat", 2), ("dog", 2), ("tiger", 2)))
  151. val array = rdd1.leftOuterJoin(rdd2) // 左外連線
  152. //Array((wolf,(1,None)), (dog,(1,Some(2))), (cat,(1,Some(2))), (cat,(3,Some(2))))
  153. for ((k, v) <- array) {
  154. println("key:" + k + " value:" + v._2.getOrElse(0))
  155. }
  156. }
  • cache
  def test_cache(sc: SparkContext) = {
    val rdd = sc.parallelize(List()).cache() //快取 (不會壓縮)
    /*
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): this.type = persist()
     */
    val rdd2 = sc.parallelize(List()).persist() //該方法等於cache,預設是MEMORY_ONLY
    /*
     spark.storage.memoryFraction = 0.6//這個意思表示0.6的記憶體作為快取,其餘的作為計算記憶體
     
     StorageLevel.DISK_ONLY 只存到磁碟
     StorageLevel.DISK_ONLY_2 在其他worker也快取一份
     StorageLevel.MEMORY_AND_DISK
     StorageLevel.MEMORY_AND_DISK2
     StorageLevel.MEMORY_AND_DISK_SER//SER表是序列化壓縮
     StorageLevel.MEMORY_AND_DISK_SER2
     StorageLevel.MEMORY_ONLY_SER
     StorageLevel.MEMORY_ONLY_SER2
     StorageLevel.NONE
     StorageLevel.OFF_HEAP//Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled. 
     */
    val rdd3 = sc.parallelize(List()).persist(StorageLevel.OFF_HEAP)
  }