spark使用效能優化記錄
效能調優:
總則:加資源加並行度 簡單直接,調節最優的資源配置 RDD架構和持久化
當可分配的資源無法達到更多的時候在考慮效能調優
從 重劍無鋒 到 花拳繡腿
1.分配資源 並行度 RDD架構和快取
2.shuffle調優
3.spark運算元調優
4.JVM調優 、 廣播大變數
分配哪些資源:executor(task--worker任務數) cpu per executor
在spark submit shell腳本里面調整對應的引數
yarn資源佇列
standalone 熟悉硬體配置
原則:你能使用多大資源就調節多大
可有並行執行的task ,executor * cpu cor,減少磁碟IO 提高效能
如果有RDD.cache(),分配更多的記憶體,不用寫入磁碟 ,如果記憶體較小,jvm 垃圾回收也會成為較大的問題,shuffle階段reduce端拉取資料聚合,也是消耗記憶體,溢位會寫入磁碟。
Job: 多個
並行度: 各個stage階段的task數量,官方推薦 task數量設定成cpu核心數的 2-3倍 這樣CPU core 不至於那樣的空閒 提升效能conf.set("spark.default.parallelism","500")
RDD持久化後,記憶體、磁碟 各有一部分資料,當記憶體不夠支撐 可以將RDD序列化成位元組陣列,就一個物件大大減少記憶體空間佔用,在使用需要反序列化
純記憶體 persist(StorageLevel.MEMORY_ONLY()) 可以用.cache()代替
persist(StorageLevel.MEMORY_ONLY_SER())
persist(StorageLevel.MEMORY_AND_DISK())
persist(StorageLevel.MEMORY_AND_DISK_SER())
persist(StorageLevel.DISK_ONLY())
persist(StorageLevel.MEMORY_ONLY_2()) 雙副本機制
廣播變數: 不用的話map副本消耗網路資源,傳輸到各個task 每個都佔用記憶體,堆區消耗增大,大變數廣播出去,不是直接使用,開始driver上面一份 task去BlockManager負責管理的某個Executoor對應的記憶體磁碟上的資料上面拿 ,BLM沒有的話會去driver或者另一個BLM上面拿,不是每個task一個副本,而是每個executor上一個隨機抽取的時候可以用到廣播變數broadcast
Kryo序列化:
使用:運算元中用到的外部變數、持久化RDD進行序列化時、shuffle
Kryo序列化 比java預設的序列化速度快 記憶體佔用小 是java序列化機制的1/10
在sparkconf中設定屬性 .set(spark.serializer,org.apache.spark.serializer.KryoSreializer)
註冊你需要Kryo序列化的類 .registerKryoClass(new Class[]{xxx.class})
資料本地化等待時間
task沒有被分配到它資料所在的節點(節點計算資源滿了)
task等待預設3s 等不下去就會被分配到別的節點 或者task去用節點的另一個executor執行,task的executor去BlockManager拉取所需的資料,這需要網路傳輸,通過TransferService 去遠端其他節點找。最差可能需要跨機架去拉取資料
本地化級別 : 程序本地化 、節點本地化、機架本地化
可以觀察日誌 PROCESS_LOCAL ANY 都是什麼級別的 調節等待時長 檢視spark作業時長有沒有縮短
調節方法:conf .set("spark.locality","10")
JVM調優:
降低cache操作的記憶體佔比
讓task執行運算元函式時 有更多的記憶體可以使用 減少persist記憶體佔比
JVM 記憶體劃分:運算元計算用 、persist用於持久化
executor 堆外記憶體與連線等待時長
task stage0 階段,堆外記憶體不足,OOM,block manager 掛掉
task stage1 通過mapoutputtracker 得到需要的資料地址,去stage0 階段找的時候 找不到,此時任務會重新執行...反覆幾次task失敗,檢視log shuffle output file not found 就是這個問題 task lost 、oom 等
設定 :提交作業的時候引數設定裡面,預設是300M 通常大任務的時候是不夠的
-conf spark.yarn.executor.memoryOverhead=2048
task去別的BlockManager上拉取資料的那個節點,正在進行JVM GC,此時連線超過預設60s 超時失敗,反覆提交stage
報錯: error:file id uuid not found 、file lost 等
-conf spark.core.connection.ack.wait.time.out=300,這個可能解決偶爾的資料拉取失敗情況
shuffle調優
shuffle原理
在某個action觸發job的時候 DAGScheduler 會負責劃分job為多個stage 依據運算元 將操作前半部分以及之前所有的RDD transformation,操作劃分為一個stage shuffle的後半部分直到action為之的RDD和transformation操作劃分為另外一個stage,所以一個shuffle 會有兩個stage ,stage0 通過記憶體緩衝區,緩衝區滿溢後再spill溢寫將key相同的寫入磁碟的同一個檔案,driver總共有多少個task 他就生成多少個檔案,stage1 將屬於他的key的那個檔案拉取檔案K,V對 到記憶體緩衝區 用HashMap資料格式 進行(k,v)的匯聚。task用我們自定義的聚合函式進行累加得到的最終的值 就完成了shuffle
合併map端輸出檔案:
.set("spark.shuffle.consolidateFiles","true") 預設是不開啟的
並行執行的task建立輸出檔案 下一批並行執行的task 複用之前已有的輸出檔案 大大減少了map端的輸出檔案,也減少了stage1階段task任務的拉取檔案數,大大減少了磁碟的IO,實際生產環節 可能減少的時間是可以縮短一半的
調節map端記憶體緩衝與reduce端記憶體佔比:
map端記憶體緩衝預設32K ,如果task處理數量較大 需要頻繁的將記憶體溢寫到磁碟
reduce端記憶體使用自己executor的JVM 堆空間q 分配是預設 0.2 當拉取的資料過多
頻繁的spill操作 溢寫到磁碟 只有聚合的時候還的多次讀寫存在磁碟的資料進行聚合
引數檢視: standalone spark UI :4040 stage executor task shuffle read writer
yarn 介面 application spark UI ...
實現設定: .set("spark.shuffle.file.buffer"."32") ,調節 64 、128 成倍加
.set("spark.shuffle.memoryFraction","0.2"),調節0.1 的加
調節之後,減少了記憶體緩衝區溢寫的次數 ,也減少了聚合讀取磁碟檔案的數量
HashShuffleManager和SortShuffleManager
spark.shuffle.manager:hash、sort、tungsten-sort
new SparkConf().set("spark.shuffle.manager", "hash") 普通的規則
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")鎢絲,官方說是優化了記憶體機制