1. 程式人生 > 其它 >Spark權威指南(中文版)----第14章 分散式共享變數

Spark權威指南(中文版)----第14章 分散式共享變數

除了彈性分散式資料集(RDD)介面之外,Spark中的第二類底層API是兩種型別的“分散式共享變數”:廣播變數和累加器。這些變數可以在使用者定義的函式中使用(例如,在RDD或DataFrame上的map函式中),這些函式在叢集上執行時具有特殊屬性。具體來說,accumulators讓將所有task中的資料相加到一個共享結果中(例如,實現一個計數器,統計job中計算失敗的輸入資料),而廣播變數讓你儲存一個較大的值在所有worker節點,並在多個spark action之間重用,而不需要重新發送到叢集各個節點。本章討論了這些變數型別的一些動機以及如何使用它們。

14.1. 廣播變數

廣播變數是一種可以在叢集中有效地共享不可變值的方法,而無需將該變數封裝在函式閉包中。在任務內的驅動程式節點中使用變數的通常方法是在函式閉包中引用它(例如,在map操作中),但是這可能是低效的,特別是對於大型變數,例如查詢表或機器學習模型。這樣做的原因是,當您在閉包中使用一個變數時,它必須在工作節點上多次反序列化(每個任務一個)。此外,如果您在多個Spark操作和作業中使用相同的變數,它將與每個作業一起重新發送給worker,而不是隻傳送一次。

這就是廣播變數發揮作用的地方。廣播變數是共享的、不可變的變數,快取在叢集中的每臺機器上,而不是與每個任務一起序列化。規範用例是傳遞一個大的查詢表,這個查詢表適合Executor上的記憶體,並在函式中使用它,如圖14-1所示。

例如,假設您有一個單詞或值列表:

您希望用您所擁有的其他資訊來補充您的單詞列表,這些資訊的大小可能是許多kb、mb,甚至可能是gb。如果我們從SQL的角度來考慮,這在技術上是一個正確的連線:

我們可以廣播這個結構,並使用suppBroadcast引用它。這個值是不可變的,當我們觸發一個action時,它會被延遲地複製到叢集中的所有節點:

我們通過value方法引用這個變數,它返回我們之前得到的確切值。此方法可在序列化函式中訪問,而無需序列化資料。這可以為您節省大量的序列化和反序列化成本,因為Spark使用廣播在叢集中更有效地傳輸資料:

現在我們可以使用這個值轉換RDD。在本例中,我們將根據map中可能有的值建立一個鍵-值對。如果我們缺少這個值,我們將簡單地用0替換它:

這將在Python中返回以下值,在Scala中返回陣列型別中的相同值:

這與將其傳遞到閉包之間的惟一區別是,我們以一種更有效的方式完成了這一操作(當然,這取決於資料量和執行器的數量)。對於小叢集上的非常小的資料(低KBs),它可能不是)。雖然這個小字典的開銷可能不是太大,但是如果您有一個大得多的值,那麼為每個任務序列化資料的開銷可能相當大。

需要注意的一點是,我們在RDD上下文中使用了它;我們也可以在UDF或Dataset中使用它,並獲得相同的結果。

14.2. 累加器

Accumulators(圖14-2)是Spark的第二種型別的共享變數,它可以在各種轉換中更新一個值,並以高效和容錯的方式將該值傳播到驅動程式節點。

累加器提供了一個可變變數,Spark叢集可以安全地按行更新該變數。您可以將其用於除錯目的(例如,跟蹤每個分割槽的某個變數的值,以便隨著時間的推移使用它),或者建立低階聚合。累加器是僅通過交換律和結合律操作“add”到其中的變數,因此可以有效地並行支援。您可以使用它們來實現計數器(如在MapReduce中)或求和。Spark天生支援數值型別的累加器,程式設計師可以新增對新型別的累加器。

對於僅在action內部執行的累加器更新,Spark保證每個任務對累加器的更新只應用一次,這意味著重新啟動的任務不會更新值。在transformations中,您應該知道,如果重新執行任務或作業階段,每個任務的更新可以應用不止一次。

Accumulators不會改變Spark的惰性評估模型。如果累加器在RDD上的操作中被更新,那麼它的值只在實際計算該RDD時更新一次(例如,當您對該RDD或依賴於它的RDD呼叫操作時)。因此,不能保證在map()這樣的延遲轉換中執行累加器更新。

累加器可以命名也可以不命名。命名的累加器將在Spark UI中顯示它們的執行結果,而未命名的則不會。

14.2.1.基本例子

讓我們通過對我們在本書前面建立的Flight資料集執行自定義聚合來進行實驗。在這個例子中,我們將使用Dataset API而不是RDD API,但是擴充套件非常類似:

現在讓我們建立一個累加器來計算往返中國的航班數量。儘管我們可以在SQL中以一種相當直接的方式來實現這一點,但是很多事情可能沒有這麼簡單。累加器提供了一種程式設計方式,允許我們執行這些計數。下面演示如何建立一個未命名的累加器:

我們的用例更適合命名累加器。有兩種方法可以做到這一點:一種是簡寫方法,另一種是普通方法。最簡單的方法是使用SparkContext。或者,我們可以例項化累加器,並註冊它的名稱:

我們在傳遞給函式的字串值中指定累加器的名稱,或者作為傳遞給register函式的第二個引數。命名的累加器將顯示在Spark UI中,而未命名的則不會。

下一步是定義新增到累加器的方式。這是一個相當簡單的函式:

現在,讓我們通過foreach方法遍歷flights資料集中的每一行。原因是foreach是一個Action,Spark可以提供僅在動作內部執行的保證。

foreach方法將對輸入DataFrame中的每一行執行一次(假設我們沒有過濾它),並對每一行執行函式,相應地增加累加器:

這將很快完成,但是如果導航到Spark UI,甚至在以程式設計方式查詢它之前,您就可以在每個執行器級別上看到相關的值,如圖14-3所示

當然,我們也可以通過程式設計來查詢它。為此,我們使用value屬性:

14.2.2. 自定義累加器

雖然Spark提供了一些預設的累加器型別,但有時您可能希望構建自己的自定義累加器。為了做到這一點,您需要繼承AccumulatorV2類。您需要實現幾個抽象方法,如下面的示例所示。在本例中,我們將只向累加器新增值。雖然這又是一個簡單的例子,但它應該向您展示構建自己的累加器是多麼容易:

// in Scalaimport scala.collection.mutable.ArrayBufferimport org.apache.spark.util.AccumulatorV2
val arr = ArrayBuffer[BigInt]()
class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] { private var num:BigInt = 0 def reset(): Unit = { this.num = 0 } def add(intValue: BigInt): Unit = { if (intValue % 2 == 0) { this.num += intValue } } def merge(other: AccumulatorV2[BigInt,BigInt]): Unit = { this.num += other.value } def value():BigInt = { this.num } def copy(): AccumulatorV2[BigInt,BigInt] = { new EvenAccumulator } def isZero():Boolean = { this.num == 0 }}val acc = new EvenAccumulatorval newAcc = sc.register(acc, "evenAcc")// in Scalaacc.value // 0flights.foreach(flight_row => acc.add(flight_row.count))acc.value//31390

如果您主要是Python使用者,還可以建立自己的自定義累加器,方法是子類化AccumulatorParam並使用它,就像我們在前面的示例中看到的那樣。

14.3. 結束語

在本章中,我們討論了分散式變數。這些工具對於優化或除錯非常有用。在第15章中,我們將定義Spark如何在叢集上執行,以便更好地理解這些操作何時有用。