Spark(六)Spark程式設計進階
目錄:
6、Spark程式設計進階
6.1、累加器
通常在向Spark傳遞函式時,比如使用map()函式或者用filter()傳條件時,可以使用驅動器程式定義的變數,但是叢集中執行的每個任務都會得到這些變數的一位新的副本。更新這些副本的值不會影響驅動器中的對應變數。spark的兩個共享變數:累加器和廣播變數,分別為結果與廣播這兩種常見的通訊模式突破了這一限制。
第一種共享變數,即累加器,提供了將工作節點中的值聚合到驅動器程式中的簡單語法。
1、Spark內建的提供了Long和Double型別的累加器。
LongAccumulator longAccumulator = jsc.sc().longAccumulator();
DoubleAccumulator doubleAccumulator = jsc.sc().doubleAccumulator();
2、主要方法介紹
add方法:賦值操作
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進行合併的方法。
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
3、使用累加器需要注意的點
a、只有在行動操作中才會觸發累加器,也就是說如:flatMap()轉換操作因為Spark惰性特徵所以只用當執行行動操作(如:count等)時累加器才會被觸發;累加器只有在驅動程式中才可訪問,worker節點中的任務不可訪問累加器中的值.
b、使用Accumulator時,為了保證準確性,只使用一次action操作。如果需要使用多次則使用cache或persist操作切斷依賴
簡單實用demo
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
accum.value();// returns 10
6.2、廣播變數
Spark提供的廣播變數可以解決閉包函式引用外部大變數引起的效能問題;廣播變數將只讀變數快取在每個worker節點中,Spark使用了高效廣播演算法分發變數從而提高通訊效能;如直接在閉包函式中使用外部 變數該變數會快取在每個任務(jobTask)中如果多個任務同時使用了一個大變數勢必會影響到程式效能;廣播變數:每個worker節點中快取一個副本,通過高效廣播演算法提高傳輸效率,廣播變數是隻讀的;Spark Scala Api與Java Api預設使用了Jdk自帶序列化庫,通過使用第三方或使用自定義的序列化庫還可以進一步提高廣播變數的效能。看個demo
廣播變數的優勢:是因為不是每個task一份變數副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變數產生的副本大大減少。
簡單解釋就是:上面demo定義了一個sexMapBC的廣播變數,這個變數每臺work上只存一份,然後該work上的所有task共享這個變數。
左邊沒有采用廣播變數,右邊採用了廣播變數。 左每個task都有一個副本,右邊只有worker上一個副本。
網上的一個例子:
50個Executor 1000個task。
預設情況下,1000個task 1000個副本
1000 * 10M = 10 000M = 10 G
10G的資料,網路傳輸,在叢集中,耗費10G的記憶體資源。
如果使用 廣播變數:
50個Executor ,50個副本,10M*50 = 500M的資料。
網路傳輸,而且不一定是從Dirver傳輸到各個節點,還可能是從就近的節點 的Executor的BlockManager上獲取變數副本,網路傳輸速度大大增加。之前 10000M 現在 500M。20倍網路傳輸效能的消耗。20倍記憶體消耗的減少。
雖然說,不一定會對效能產生決定向性的作用。比如執行30分鐘的spark作業,可能做了廣播變數以後,速度快了2分鐘。變成28分鐘。
注意一點:廣播變數建立後,它可以執行在叢集中的任何Executor上,而不需要多次傳遞給叢集節點。另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。
6.3、基於分割槽進行操作
後續。
6.4、與外部程式間的管道
後續。
6.5、數值RDD的操作
後續。