1. 程式人生 > >Spark官方文件《Spark Programming Guide》解讀

Spark官方文件《Spark Programming Guide》解讀

RDD的操作型別: Transformation 和 Action

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program

transformation:把資料集從一種形式“轉置”為另一種形式。

action: 向driver返回一個值。

兩者的本質區別是:transformation發生在各個excutor上(類似於MR中的map階段),action需要從excutor上收集彙總結果(類似於MR中的reduce階段)

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

鑑於transformation和excutor的性質,我們不難設想:為了避免driver和executor之間不必要的通訊,特別是不要的“中間資料集”的傳遞,transformation操作應該總是lazy的,也就是說spark不需要也不應該在遇到一個transformation操作時就立刻推送到executor上執行,而是直到遇到一個action時再統一一併運算,因為action是需要從executor返回資料的。這個很好舉例,比如一段程式碼中第一個動作是一個map,執行結束之後每個executor上都是一個經過轉置的新的dataset,如果第二個動作還是一個map或其他的transformation,則可以在每個executor上基於當前的這個dataset繼續運算,沒有返回任何結果給driver的必要。

Passing Functions to Spark

推薦的方式有兩種:

  1. 匿名函式(lamdba表示式),即一段程式碼片段
  2. 全域性單一的函式體:全域性靜態方法 or 函式物件, 此類情況暗含的意思是:函式本身不從屬於一個特定的物件,無狀態不會導致函式依附的物件(的狀態)和物件的所有欄位(的值)被傳遞到executor.

但實際上你還可以用第三種方式傳遞函式,即把函式定義為一個類的方法,以引數的形式把rdd傳給這個方法,比如像這樣:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

但是上述做法已經開始暴露出一些“隱患”, 因為func1是MyClass的一個成員方法, 在執行時刻,rdd.map(func1)實際上是rdd.map(this.func1),也就是說在計算時,當時的MyClass的例項會傳遞到所有的executor上,如果MyClass沒有任何欄位和外部引用的變數,則尚無明顯問題,但當其包含欄位時(有狀態時),MyClass例項的所有欄位的值以及欄位依賴的值都需要傳遞到所有executor上,而其中絕大多數的值是不需要的,這會帶來不必要的的資料傳輸,比如下面這段程式碼:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

我們也可以認為這是不恰當使用“閉包”帶來的副作用,

Understanding Closures

對於這一部分,文件中給出的是一個絕佳的例子,透徹理解這個例子背後的原因就能明白Spark與閉包之間的微妙關係:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

如果這是在本地(單節點)執行的程式碼,那麼這就一段經典的演示“閉包”概念的程式碼。函式字面量counter += x是一個閉包,它有一個自由變數counter, 當這個閉包傳遞給rdd參與運算的過程中,counter變數一直存在於伴隨這個閉包的上下文中,其狀態(值)會持續的被改變並保持,因此,在運算結束後,counter的值就是rdd中資料條目的總數。

但如果是在叢集模式(多節點)下,情況就大不相同了,由於資料被分散到了多個節點上並行處理,計算邏輯,也就是我們的閉包也要分發到各個節點上去執行(Spark會完成閉包的序列化工作),OK,我們可以確定的是:當閉包分發到各個executor上時,閉包的中所有自由變數的值都來自於driver發分發時的那個上下文裡,現在閉包完全地獨立出來了,就是完全在一個全新的環境中運行了,executor本地的自用變數與driver上的那些對應變數已經沒有任何關係了,它們都將在各自的環境裡自由演變了,彼此之間沒有任何聯絡了。簡單地解釋就是:閉包和它的上下文只在一個程序(節點)中有效。因此:在本例中,你不能指望在driver端執行這段程式碼後得到counter的值,實際上它只是在初值是0的時候被複制了n份傳遞給executor之外,它在driver端什麼也沒做,而executor端的那個counter副本只在exexutor本地計算著“區域性”的counter, 它已和driver端的這個counter沒有絲毫的關係,所以最後的結果是:程式執行結束後,列印的這個counter是driver端的這個coutner,它沒有被計算,它的值一直是0.

另外一方面,如果counter是一個全域性性(正對整個程式或著說叢集)的只讀的初值,那麼作為閉包的自由變數傳播到整個叢集這是有意義的,但如果executor企圖要改寫一個全域性性的初值,則這樣做是沒有意義的。對於Spark來說,合適作為閉包的自由變數應該是這樣一種值:在區域性資料集計算過程中需要持續追蹤狀態的變數!

To execute jobs, Spark breaks up the processing of RDD operations into tasks - each of which is operated on by an executor. Prior to execution, Spark computes the closure. The closure is those variables and methods which must be visible for the executor to perform its  computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor. 

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state.

Working With Key-Value Pairs

Spark大多數的操作是面向RDD的,而RDD可以包含任何型別的物件。也就是說,Spark大多數的操作是可以應用於任何型別的資料物件的。但是也存在一些特殊操作是面向特定的key-value pairs, 這些特殊操作絕大多數是和“shuffle”相關的,比如按一個key進行grouping和aggregation等。

比如如下的示意程式碼:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

第二行,經過lines.map操作之後得到的新RDD的元素是以文字每一行為key,以固定數字1為value的pair,而在這樣一個RDD上使用reduceByKey時就變得有趣了,讓我們先來看看對reduceByKey這個方法的解讀:

reduceByKey(func, [numTasks]) :
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. 

也就是說:如果資料集是(K,V)對,key的部分會由Spark來自動處理,Spark會將所有key相同的資料集進行分組聚合(也就是分在一起),然後呢,使用使用者提供的函式對它們的value進行處理,所以,reduceByKey接受的函式型別是(V,V) => V!所以上面例子中(a, b) => a + b,a和b指代的都是value,也就是數字1,而後續的動作就一個集合的reduce操作無異了。

Shuffle Operations

讓我們接著前面的reduceByKey講,這是一個很好是解釋Spark Shuffle操作的例子,對於reduceByKey,它有一個顯而易見的挑戰:對於同一個Key的所有的value會分佈於整個叢集的不同節點上,所以Spark在計算時是需要執行一個All-To-All的操作,也就是在所有分割槽上找到所有Key對應的所有Value,針對第一個key,把它對應的所有的value從所有的分割槽上帶回來進行計算, 這個過程就是shuffle.

shuffle是一個非常消耗資源的操作,為了實現shuffle,spark會進行非常類似於hadoop中的Map和Reduce操作,其中大量Map任務的結果會儲存在記憶體中,所以對記憶體資源的佔用是顯而易見的,當記憶體不足以cache所有的map結果時,這些資料會被持久化到磁碟上,因此這又會帶來本地磁碟的I/O操作,另外,shuffle會產生大量中間檔案,對於長期執行的spark job會佔用很大硬碟空間。

Shared Variables

通常來說, 跨任務(節點)的可讀寫的共享變數(或者說是scope是整個叢集的全域性變數)是很低效的,這很容易理解。儘管如此,Spark依然支援了兩種形式的共享變數:broadcast variables 和 accumulators

Broadcast Variables

首先,broadcast variables是隻讀的,更具官方文件的講述:

The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

顯式地建立broadcast variables只在這種情況下是有意義的:當任務跨域了多了stage,而在這些stage中需要使用同樣的資料(對於這種案例暫時還沒有找到清晰的例子)。

Accumulators

An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.Only the driver program can read the accumulator’s value, using its value method.

Task只可以向一個Accumulator追加值,但是不能讀它的值,只有driver程式才能讀取它的值。

本質上accumulator是trait: AccumulableParam的一個子類,它只有最主要的一個操作
addInPlace,但這並不意味著accumulator只可以做簡單的加法,這取決於你要怎樣書寫這個方法的邏輯。Spark內建支援Int型別的accumulator,使用者可以繼承AccumulableParam來實現自己的Accumulator.

最後,accumulator並不會影響Spark的Lazy機制,也 就是說,在一系列的transformation操作之後 ,accumulator的值未必會被更改,而是直到一個action執行之後,它的值在driver端才有可能更新,比如像下面這個例子:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.