1. 程式人生 > >理解spark閉包

理解spark閉包

什麼叫閉包: 跨作用域訪問函式變數。又指的一個擁有許多變數和綁定了這些變數的環境的表示式(通常是一個函式),因而這些變數也是該表示式的一部分。

Spark閉包的問題引出:
在spark中實現統計List(1,2,3)的和。如果使用下面的程式碼,程式列印的結果不是6,而是0。這個和我們編寫單機程式的認識有很大不同。為什麼呢?

object Test {
  def main(args:Array[String]):Unit = {
      val conf = new SparkConf().setAppName("test");
      val sc = new SparkContext(conf)

      val
rdd = sc.parallelize(List(1,2,3)) var counter = 0 //warn: don't do this rdd.foreach(x => counter += x) println("Counter value: "+counter) sc.stop() } }

問題分析:
counter是在foreach函式外部定義的,也就是在driver程式中定義,而foreach函式是屬於rdd物件的,rdd函式的執行位置是各個worker節點(或者說worker程序),main函式是在driver節點上(或者說driver程序上)執行的,所以當counter變數在driver中定義,被在rdd中使用的時候,出現了變數的“跨域”問題,也就是閉包問題。

問題解釋:
對於上面程式中的counter變數,由於在main函式和在rdd物件的foreach函式是屬於不同“閉包”的,所以,傳進foreach中的counter是一個副本,初始值都為0。foreach中疊加的是counter的副本,不管副本如何變化,都不會影響到main函式中的counter,所以最終打印出來的counter為0.

當用戶提交了一個用scala語言寫的Spark程式,Spark框架會呼叫哪些元件呢?首先,這個Spark程式就是一個“Application”,程式裡面的mian函式就是“Driver Program”, 前面已經講到它的作用,只是,dirver程式的可能執行在客戶端,也有可有可能執行在spark叢集中,這取決於spark作業提交時引數的選定,比如,yarn-client和yarn-cluster就是分別執行在客戶端和spark叢集中。在driver程式中會有RDD物件的相關程式碼操作,比如下面程式碼的newRDD.map()

class Test{
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf())
    val newRDD = sc.textFile("")

    newRDD.map(data => {
      //do something
      println(data.toString)
    })
  }
}

涉及到RDD的程式碼,比如上面RDD的map操作,它們是在Worker節點上面執行的,所以spark會透明地幫使用者把這些涉及到RDD操作的程式碼傳給相應的worker節點。如果在RDD map函式中呼叫了在函式外部定義的物件,因為這些物件需要通過網路從driver所在節點傳給其他的worker節點,所以要求這些類是可序列化的,比如在Java或者scala中實現Serializable類,除了java這種序列化機制,還可以選擇其他方式,使得序列化工作更加高效。worker節點接收到程式之後,在spark資源管理器的指揮下執行RDD程式。不同worker節點之間的執行操作是並行的。

​ 在worker節點上所執行的RDD中程式碼的變數是儲存在worker節點上面的,在spark程式設計中,很多時候使用者需要在driver程式中進行相關資料操作之後把該資料傳給RDD物件的方法以做進一步處理,這時候,spark框架會自動幫使用者把這些資料通過網路傳給相應的worker節點。除了這種以變數的形式定義傳輸資料到worker節點之外,spark還另外提供了兩種機制,分別是broadcast和accumulator。相比於變數的方式,在一定場景下使用broadcast比較有優勢,因為所廣播的資料在每一個worker節點上面只存一個副本,而在spark運算元中使用到的外部變數會在每一個用到它的task中儲存一個副本,即使這些task在同一個節點上面。所以當資料量比較大的時候,建議使用廣播而不是外部變數。