1. 程式人生 > >Spark 應用程式調優

Spark 應用程式調優

對於很多剛接觸Spark的使用者來說,他們可能主要關心資料處理的邏輯,而對於如何高效執行Spark應用程式瞭解較少。由於Spark是一種分散式記憶體計算框架,其效能往往受限於CPU、記憶體、網路等多方面的因素,對於使用者來說,如何在有限的資源下高效地執行Spark應用程式顯得尤為重要。下面只針對Spark-On-Yarn的一些常用調優策略做詳細分析。

配置引數優化

資源申請引數

Spark-On-Yarn資源排程由Yarn來管理,使用者只需指定Spark應用程式要申請的資源即可。我們首先來理解幾個資源配置項,一旦資源配置確定,則只能在這些有限的資源下執行Spark應用程式。

  • num-executors:同時執行的executor數。
  • executor-cores:一個executor上的core數,表示一次能同時執行的task數。一個Spark應用最多可以同時執行的task數為num-executors*executor-cores,建議配置executor-cores為1就夠了,想要增加並行度,增加num-executors即可。
  • driver-memory:driver的記憶體大小,視driver收集結果大小而定。
  • executor-memory:executor記憶體大小,視任務處理的資料量大小而定。

一開始我們只能通過大致的估算來確定上述資源的配置,例如一個Spark應用程式處理的資料大小為1T,如果讀出來預設是500個partitions(可以通過測試執行,從web中檢視的到),那麼平均每個partition的大小為1T/500≈2G,預設情況下,考慮中間處理過程中的資料膨脹以及一些額外記憶體消耗,executor中可用於存放rdd的閾值設定為spar.storage.memoryFraction=0.6,所以儲存partition需要的記憶體為executor-memory*0.6,穩妥一點設定executor-memory大於2G/0.6,如果一個executor不止是處理一個partition,假如num-executors設定為100,那麼平均每個executor處理的partition為500/100=5,這時如果需要快取rdd,那麼executor-memory就要設定為大於5*2G/0.6;如果讀出來的分割槽數很少(如100),一個partition很大(1T/100≈10G),使得executor-memory有可能OOM,那麼就需要考慮加大分割槽數(呼叫repartition(numPartitions)等),增加task數量來減少一個task的資料量。一般來說一個executor處理的partition數最好不要超過5個,否則增加num-executors數,接上面的例子,500個分割槽,配置num-executors為100,每個executor需要處理5個partition。driver-memory的大小取決於最後的action操作,如果是呼叫collect,那麼driver-memory的大小就取決於結果集rdd的大小,如果是呼叫count,那麼driver-memory的大小隻需要滿足執行需求就夠了,對於需要長時間迭代的Spark應用,driver端需要維護rdd的依賴關係,所以需要設定較大的記憶體。

上述僅僅是大致估算的資源配置,實際還要根據執行情況不斷的調優,以達到資源最大化利用。例如,我們在執行日誌中找到如下資訊,它表明rdd_0的partition1記憶體大小為717.5KB,當我們得到這個資訊後,就可以再次調整上述引數。 
INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)

執行時引數

(1)spark.serializer

序列化對於Spark應用的效能來說,影響是非常大的,它涉及到網路傳輸以及儲存,Spark預設是使用org.apache.spark.serializer.JavaSerializer,內部使用的是Java的ObjectOutputStream框架,這種序列化方式壓縮比小,而且速度慢,強烈建議採用kyro序列化方式,它速度快,而且壓縮比高,效能是Java序列化的10倍,只需在lz上配置擴充套件引數”spark.serializer=org.apache.spark.serializer.KryoSerializer”即可,一般來說使用kyro序列化方式,需要在程式裡面對使用者自定義的可序列化的類進行註冊,例如下面程式碼所示:

valconf =newSparkConf()
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
valsc =newSparkContext(conf)

但是如果你不註冊,kyro也是可以工作的,只是序列化效率差一點。

(2)spark.rdd.compress

這個引數決定了RDD Cache的過程中,RDD資料是否需要進一步壓縮再Cache到記憶體或磁碟中,從記憶體看來,當記憶體比較稀缺時,如果不做壓縮就Cache,就很可能會引發GC拖慢程式,從磁碟看來,壓縮後資料量變小以減少磁碟IO。所以如果出現記憶體吃緊或者磁碟IO問題,就需要考慮啟用RDD壓縮。預設是關閉的。

(3)spark.storage.memoryFraction

前面提到的executor-memory決定了每個executor可用記憶體的大小,而spark.storage.memoryFraction則決定了在這部分記憶體中有多少可以用於管理RDD Cache資料,剩下的記憶體用來保證任務執行時各種其它記憶體空間的需要。spark.executor.memoryFraction預設值為0.6,官方文件建議這個比值不要超過JVM Old Gen區域的比值,因為RDD Cache資料通常都是長期駐留記憶體的,理論上也就是說最終會被轉移到Old Gen區域,如果這部分資料允許的尺寸太大,勢必把Old Gen區域佔滿,造成頻繁的FULL GC。如果發現Spark應用在執行過程中發生頻繁的FULL GC,就需要考慮減小該配置,所以建議這個配置不要加大,如果記憶體吃緊,可以考慮採用記憶體和磁碟的混合快取模式,進一步減少RDD Cache還可以考慮序列化以及壓縮等。

(4)spark.shuffle.memoryFraction

在啟用Spill的情況(spark.shuffle.spill預設開啟)下,spark.shuffle.memoryFraction表示Shuffle過程中使用的記憶體達到總記憶體多少比例的時候開始Spill。spark.shuffle.memoryFraction預設值為0.2,調整該值可以調整Shuffle過程中Spill的頻率。總的來說,如果Spill太過頻繁,可以適當增加spark.shuffle.memoryFraction的大小,增加用於Shuffle的記憶體,減少Spill的次數。然而這樣一來為了避免記憶體溢位,對應的可能需要減少RDD cache佔用的記憶體,即減小spark.storage.memoryFraction的值,這樣RDD cache的容量減少,有可能帶來效能影響,因此需要綜合考慮,如果在你的Spark應用程式中RDD Cache較少,Shuffle資料量較大,就需要把spark.shuffle.memoryFraction調大一些,把spark.storage.memoryFraction調小一些。

(5)spark.shuffle.file.buffer.kb

每次shuffle過程駐留在記憶體的buffer大小,在shuffle中間資料的產生過程中可減少硬碟的IO操作。spark.shuffle.file.buffer.kb預設為32,若Spark應用程式執行過程中Shuffle稱為瓶頸,根據需要適當的加大該配置。

介面使用優化

對於Spark使用者來說,他們可能不太瞭解RDD介面內部實現細節,主要關心業務資料處理,然而這往往導致編寫出來的Spark應用程式執行效率不高,資源利用浪費等。下面簡單介紹一些常見的Spark應用開發注意細節。

快取介面

Spark比MapReduce快的很大一部分原因是它可以把中間結果RDDCache起來,不用每次需要時重新計算。但是如果Cache使用不當,會造成記憶體吃緊,要麼帶來不必要的磁碟IO,要麼引起頻繁的FULL GC,拖慢程式執行。

對於一個需要多次使用的臨時RDD(類似於臨時變數),儘可能要把它Cache起來,這樣這個臨時RDD只會計算一次,以後每次都會從Cache裡直接取。如下面的例子,需要統計第一個欄位大於100的數目和第二個欄位大於100的數目,如果data不做Cache,因為只有遇到RDD的Action介面時才出發計算,所以在計算firstCnt時會讀一遍資料,計算secondCnt時還會再讀一遍資料,這樣就造成一些不必要的計算,對data做了Cache後,在計算firstCnt時讀一次,計算secondCnt就會直接從Cache中取而不用再次讀一次。

val data = val data = sc.textFile(path)
data.cache()
valfirstCnt = data.filter(x(0).toInt =>100).count()
valsecondCnt = data.filter(x(1).toInt =>100).count()

很多時候會看到這樣的程式碼,在對兩個RDD進行Join時,把兩個RDD都Cache起來再做Join,這裡一定要明白一點,沒有呼叫Action介面,計算是不會觸發的,下面的程式碼如果後續不再用到rdd1和rdd2,是沒有必要對rdd1和rdd2做Cache的,這裡要做Cache的是data。

val data = val data = sc.textFile(path)
val rdd1 = data.map(…).cache()
val rdd2 = data.map(…).cache()
val rdd3 = rdd1.join(rdd2).count()

對於內部需要多次迭代的Spark應用來說,應該儘量將每次迭代用到的臨時RDD快取起來,在這個臨時RDD被更新時,需要將舊的快取手動清除掉。如下例子顯示,每次迭代都需要在curRDD基礎上進行更新得到updatedRDD,在一輪迭代結束後要更新curRDD為updatedRDD,在更新前需要手動將之前的curRDDCache清理掉,防止記憶體被耗光,引發頻繁FULL GC。

val data = sc.textFile(path)// some transformations in init(data)
varcurRDD = init(data).cache()
val result =newArrayBuffer[Double]()// some transformations and an action in getResult(curRDD)
result += getResult(curRDD)// Start Iterationvar changed =truewhile(changed){// some transformations in iteration(curRDD)
valupdatedRDD = iteration(curRDD).cache()// getResultand check if the value is changed
val x = getResult(updatedRDD)// convergenceif(x == result.last) changed =false// Unpersist old RDD and assign new RDD
curRDD.unpersist(false)
curRDD = updatedRDD
}

在對RDD做快取時,還應考慮記憶體大小情況選擇合適的快取方式,Spark提供以下幾種快取:

  • MEMORY_ONLY:直接將RDD物件儲存到記憶體中,Spark預設選項
  • MEMORY_AND_DISK:當記憶體不夠的時候,儲存到磁碟中(記憶體較為稀缺的時候用,比MEMORY_ONLY佔用更少的記憶體,但是會帶來磁碟IO)
  • MEMORY_ONLY_SER:將RDD序列化後儲存到記憶體中(記憶體較為稀缺的時候用,比MEMORY_ONLY佔用更少的記憶體)
  • MEMORY_AND_DISK_SER:將RDD序列化後儲存到記憶體中,記憶體不夠時儲存到磁碟中(記憶體較為稀缺的時候用,比MEMORY_ONLY_SER更安全)
  • DISK_ONLY:儲存到磁碟中(不建議用)
  • MEMORY_ONLY_2:與MEMORY_ONLY類似,只是儲存兩份
  • MEMORY_AND_DISK_2:與MEMORY_AND_DISK類似,只是儲存兩份
  • OFF_HEAP :將序列化後的RDD儲存到Tachyon(一種分散式記憶體檔案系統)中,相比於MEMORY_ONLY_SER可以避免GC的額外開銷。這種快取方式還在試驗階段

根據具體情況判斷使用何種快取方式,呼叫的時候直接通過如rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)方式實現,呼叫rdd.cache()預設是rdd.persist(StorageLevel.MEMORY_ONLY)。

引發Shuffle的相關介面

一個Spark應用程式執行快慢,往往受限於中間的Shuffle過程,Shuffle涉及到網路以及磁碟IO,是整個Spark應用程式執行過程中較為耗時的階段。在編寫Spark應用程式時,應當儘量減少Shuffle次數。下面列舉常見的可能引發Shuffle的介面。

  • distinct
  • Intersection/subtracted
  • reduceByKey/aggregateByKey
  • repartition
  • cogroup
  • join
  • sortBy/sortByKey
  • groupBy/groupByKey
  • partitionBy

如果executor記憶體不足以處理一個partition,那麼這時考慮呼叫repartition來加大分割槽數,使得每個partition的資料量減少以至於executor可以處理,一般來說上述介面也可以接受numPartitions引數來指定分割槽數。上述介面連續呼叫不一定會帶來多次Shuffle,只要partition型別和partition數不變,是不會增加Shuffle次數的,如下程式碼則只有一次Shuffle:

rdd.map(x =>(x, x+1)).repartition(1000).reduceByKey(_ + _).count()

然而如下程式碼卻會有兩次Shuffle:

rdd.map(x =>(x, x+1)).repartition(1000).reduceByKey(_ + _,3000).count()

很多使用者在一開始呼叫了觸發Shuffle的相關介面,後面可能資料膨脹了,發現需要更多的partition,所以在後面呼叫觸發Shuffle的相關介面時加大partition數,這樣就會導致多次Shuffle,所以一開始就確定好最後的partition數,以免做不必要的Shuffle。

介面對比

(1)sortBy/sortByKey與takeOrdered

有時候使用者可能希望對資料集排序取前n條記錄,很多使用者會像如下程式碼一樣實現:

rdd.sortBy(x => x.key).take(n)//or rdd.sortByKey().take(n)

然而,有一個更有效的辦法,就是按照以下方式實現:

rdd.takeOrdered(n)

以上兩者的區別在於,第一種方式需要把所有partition的排序結果進行歸併再取前n條記錄,第二種方式是從每個排好序的partition中取出前n條記錄最後再歸併為n條記錄,大大降低了網路IO,提升整體效能。

(2)groupBy/groupByKey與aggregateByKey

在做分組計算時,首先會想到使用groupBy/groupByKey介面,值得一提的是,groupBy/groupByKey介面特別佔用記憶體,它是把具有相同key值的所有value放到一個buffer數組裡,如果某個key對應的value非常多,極其容易引發OutOfMemoryError,通過groupBy/groupByKey實現的分組計算功能是可以通過aggregateByKey或者reduceByKey來實現的,aggregateByKey/reduceByKey內部是通過combineByKey實現的,當記憶體超過一定閾值會spill到磁碟,相對來說較為安全。當通過groupBy/groupByKey介面最後返回的RDD[(K, V)]中V不是序列時,可以用reduceByKey實現,當V是序列時可以用aggregateByKey實現,例如需要統計key對應的value最大值:

//rdd: RDD[(int, int)]
rdd.groupByKey().map((k, vb)=>(k, vb.max))

我們完全可以用reduceByKey來實現上述功能:

rdd.reduceByKey ((v1, v2)=>Math.max(v1, v2))

再比如,就想返回key對應的所有value:

//rdd: RDD[(int, int)]
rdd.groupByKey()

我們完全可以用aggregateByKey來實現上述功能:

rdd. aggregateByKey(Seq())((u, v)=> v::u,(u1, u2)=> u1 ++ u2
)

以上是簡單提出幾個需要注意的介面呼叫,如果不瞭解RDD介面的使用,可以參見社群文件。

轉載地址  http://ju.outofmemory.cn/entry/185740

相關推薦

Spark 應用式調

對於很多剛接觸Spark的使用者來說,他們可能主要關心資料處理的邏輯,而對於如何高效執行Spark應用程式瞭解較少。由於Spark是一種分散式記憶體計算框架,其效能往往受限於CPU、記憶體、網路等多方面的因素,對於使用者來說,如何在有限的資源下高效地執行Spark應用程式

一道阿里筆試題解析--式調

下列方法中,____不可以用來程式調優?         A.改善資料訪問方式以提升快取命中率         B.使用多執行緒的方式提高 I/O 密集型操作的效率         C.利用資料庫連

Spark深入學習 -14】Spark應用經驗與程序調

aps 它的 stack 申請 vco 用戶 統一 persist 資料 ----本節內容------- 1.遺留問題解答 2.Spark調優初體驗 2.1 利用WebUI分析程序瓶頸 2.2 設置合適的資源 2.3 調整任務的並發度

GC調Spark應用中的實踐(轉載)

avg fix 時也 net aso 會有 介紹 完整 頻繁 Spark是時下非常熱門的大數據計算框架,以其卓越的性能優勢、獨特的架構、易用的用戶接口和豐富的分析計算庫,正在工業界獲得越來越廣泛的應用。與Hadoop、HBase生態圈的眾多項目一樣,Spark的運行離不開J

Spark應用程式開發引數調深入剖析-Spark商業調實戰

本套系列部落格從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套部落格。版權宣告:本套Spark商業應用實戰歸作者(秦凱新)所有,禁止轉載,歡迎學習。 Spark商業應用實戰-Spark資料傾斜案例測試及調優準則深入剖析 Spark商業應用實戰-Spark資源

delphi項目程序輸出編譯成應用序文件

相對路徑 打包壓縮 存儲 jpg 右擊 相對 alt 應用 http 1.先設置編譯後的路徑保存目錄: 2.右擊項目名,點compile 開始編譯,編譯成功後點OK 3.到輸出文件目錄裏找到輸出的文件夾,這個文件夾就是整個項目導出的存儲目錄。如果需要發送給他人

Spark性能調之道——解決Spark數據傾斜(Data Skew)的N種姿勢

sca ace 便是 triplet 大小 spark 構建 由於 itl 原文:http://blog.csdn.net/tanglizhe1105/article/details/51050974 背景 很多使用Spark的朋友很想知道rdd

spark性能調(二) 徹底解密spark的Hash Shuffle

弱點 sta 出了 寫到 三方 很大的 完成 map 重新 裝載:http://www.cnblogs.com/jcchoiling/p/6431969.html 引言 Spark HashShuffle 是它以前的版本,現在1.6x 版本默應是 Sort-Based Sh

Spark模型(中)

tool irf split exe too rdd count pil 取數 先在IDEA新建一個maven項目 我這裏用的是jdk1.8,選擇相應的骨架 這裏選擇本地在window下安裝的maven 新的項目創建成功 我的開始pom.xml

Spark模型(下)

spa pan -s mage 編程 編程模型 rdd alt img

spark性能調之資源調

重要 cnblogs logs 做的 參數說明 span 分配 比例 drive 轉https://tech.meituan.com/spark-tuning-basic.html spark作業原理 使用spark-submit提交一個Spark作業之後,這個作

Yarn上常駐Spark-Streaming程序調

disable principal row use 傳輸 設置 較高的 提高 此外 對於長時間運行的Spark Streaming作業,一旦提交到YARN群集便需要永久運行,直到有意停止。任何中斷都會引起嚴重的處理延遲,並可能導致數據丟失或重復。YARN和Apache Sp

Spark調試參數

-o out executor ket ons app words ans class Spark遠程調試腳本: #調試Master,在master節點的spark-env.sh中添加SPARK_MASTER_OPTS變量 export SPARK_MASTER_OPTS

Spark 性能調零散知識

ges ermaster 傾斜 entry 鏈接方式 nbsp spec manage 基礎 1. 如果 Spark 中 CPU 的使用率不夠高,可以考慮為當前的程序分配更多的 Executor, 或者增加更多的 Worker 實例來充分的使用多核的潛能 2. 適當設置 P

Spark應用領域廣泛,能做什麽呢?

Spark Hadoop 大數據開發 Spark能做什麽?Spark應用領域Spark是大數據技術中數據計算處理的王者,能夠一次處理PB級的數據,分布在數千個協作的物理或虛擬服務器集群中,它有一套廣泛的開發者庫和API,並且支持Java,Python,R和Scala等語言,其靈活的特性,適合各種環

Spark性能調-基礎篇

以及 sce 集合 table 團隊 加載 分析 功能 serializa 前言 在大數據計算領域,Spark已經成為了越來越流行、越來越受歡迎的計算平臺之一。Spark的功能涵蓋了大數據領域的離線批處理、SQL類處理、流式/實時計算、機器學習、圖計算等各種不同類型的計算操

IDEA搭建scala開發環境開發spark應用程序

編寫 運行程序 通過 https apach import input inf 搭建 一、idea社區版安裝scala插件 因為idea默認不支持scala開發環境,所以當需要使用idea搭建scala開發環境時,首先需要安裝scala插件,具體安裝辦法如下。 1、

Spark環境搭建及WordCount實例

enter 默認 自己 apache block 編程 mar compile edi 基於Intellij IDEA搭建Spark開發環境搭建  基於Intellij IDEA搭建Spark開發環境搭——參考文檔   ● 參考文檔h

Spark模型(上)

導致 內容 val net 什麽是 元素 如果 ont cat 初識RDD 什麽是RDD? 定義:Resilient distributed datasets (RDD), an efficient, general-purpose and fault-tolerant

【Big Data 每日一題】Spark開發效能調總結

1. 分配資源調優 Spark效能調優的王道就是分配資源,即增加和分配更多的資源對效能速度的提升是顯而易見的,基本上,在一定範圍之內,增加資源與效能的提升是成正比的,當公司資源有限,能分配的資源達到頂峰之後,那麼才去考慮做其他的調優 如何分配及分配哪些資源 在生產環境中,提交spark作