1. 程式人生 > >spark使用效能優化記錄

spark使用效能優化記錄

效能調優:

總則:加資源加並行度 簡單直接,調節最優的資源配置 RDD架構和持久化

  當可分配的資源無法達到更多的時候在考慮效能調優

   重劍無鋒 花拳繡腿

  1.分配資源 並行度 RDD架構和快取

  2.shuffle調優

  3.spark運算元調優

  4.JVM調優 、 廣播大變數

    分配哪些資源:executor(task--worker任務數)  cpu per  executor

(每個作業的cpu核心數)、memory (可以使用的記憶體)、driver memory(影響不大)

spark submit shell腳本里面調整對應的引數

yarn資源佇列

standalone  熟悉硬體配置

 原則:你能使用多大資源就調節多大

可有並行執行的task ,executor * cpu cor,減少磁碟IO 提高效能

如果有RDD.cache(),分配更多的記憶體,不用寫入磁碟 ,如果記憶體較小,jvm 垃圾回收也會成為較大的問題,shuffle階段reduce端拉取資料聚合,也是消耗記憶體,溢位會寫入磁碟。

Job: 多個

stage ,stage0結束到reduceBykey的時候會在stage1階段的reducetask建立一份檔案(可以有多個key但是相同key的一定在同一個檔案裡) stage0 端拉取自己的資料

並行度:  各個stage階段的task數量官方推薦 task數量設定成cpu核心數的 2-3倍  這樣CPU core 不至於那樣的空閒 提升效能conf.set("spark.default.parallelism","500")

RDD持久化後,記憶體、磁碟 各有一部分資料當記憶體不夠支撐 可以將RDD序列化成位元組陣列,就一個物件大大減少記憶體空間佔用,在使用需要反序列化

還是OOM 存入磁碟,記憶體很大時,可以雙副本機制兩份持久化資料

純記憶體 persist(StorageLevel.MEMORY_ONLY())   可以用.cache()代替

   persist(StorageLevel.MEMORY_ONLY_SER())

   persist(StorageLevel.MEMORY_AND_DISK())

   persist(StorageLevel.MEMORY_AND_DISK_SER())

   persist(StorageLevel.DISK_ONLY())

   persist(StorageLevel.MEMORY_ONLY_2()) 雙副本機制

廣播變數:   不用的話map副本消耗網路資源傳輸到各個task 每個都佔用記憶體,堆區消耗增大,大變數廣播出去,不是直接使用,開始driver上面一份 taskBlockManager負責管理的某個Executoor對應的記憶體磁碟上的資料上面拿 ,BLM沒有的話會去driver或者另一個BLM上面拿不是每個task一個副本而是每個executor上一個隨機抽取的時候可以用到廣播變數broadcast

Kryo序列化

 使用:運算元中用到的外部變數、持久化RDD進行序列化時、shuffle

 Kryo序列化 比java預設的序列化速度快 記憶體佔用小 是java序列化機制的1/10

在sparkconf中設定屬性 .set(spark.serializer,org.apache.spark.serializer.KryoSreializer)

註冊你需要Kryo序列化的類 .registerKryoClass(new Class[]{xxx.class})

資料本地化等待時間

task沒有被分配到它資料所在的節點(節點計算資源滿了)

task等待預設3s 等不下去就會被分配到別的節點 或者task去用節點的另一個executor執行,taskexecutorBlockManager拉取所需的資料,這需要網路傳輸,通過TransferService 去遠端其他節點找。最差可能需要跨機架去拉取資料

本地化級別 程序本地化 、節點本地化、機架本地化

可以觀察日誌 PROCESS_LOCAL ANY 都是什麼級別的 調節等待時長 檢視spark作業時長有沒有縮短

調節方法:conf .set("spark.locality","10")

JVM調優

降低cache操作的記憶體佔比

task執行運算元函式時 有更多的記憶體可以使用 減少persist記憶體佔比

JVM 記憶體劃分:運算元計算用 、persist用於持久化

executor 堆外記憶體與連線等待時長

task stage0 階段,堆外記憶體不足,OOM,block manager 掛掉

task stage1 通過mapoutputtracker 得到需要的資料地址stage0 階段找的時候 找不到此時任務會重新執行...反覆幾次task失敗檢視log shuffle output file not found 就是這個問題 task lost oom

設定 :提交作業的時候引數設定裡面預設是300M 通常大任務的時候是不夠的

-conf spark.yarn.executor.memoryOverhead=2048

task去別的BlockManager上拉取資料的那個節點正在進行JVM GC,此時連線超過預設60s 超時失敗反覆提交stage

 報錯: errorfile id uuid not found file lost

 -conf spark.core.connection.ack.wait.time.out=300,這個可能解決偶爾的資料拉取失敗情況

shuffle調優

shuffle原理

在某個action觸發job的時候 DAGScheduler 會負責劃分job為多個stage 依據運算元 將操作前半部分以及之前所有的RDD transformation操作劃分為一個stage shuffle的後半部分直到action為之的RDDtransformation操作劃分為另外一個stage所以一個shuffle 會有兩個stage ,stage0 通過記憶體緩衝區,緩衝區滿溢後再spill溢寫將key相同的寫入磁碟的同一個檔案,driver總共有多少個task 他就生成多少個檔案,stage1 將屬於他的key的那個檔案拉取檔案K,V對 到記憶體緩衝區 用HashMap資料格式 進行(kv)的匯聚。task用我們自定義的聚合函式進行累加得到的最終的值 就完成了shuffle

合併map端輸出檔案

.set("spark.shuffle.consolidateFiles","true")   預設是不開啟的

並行執行的task建立輸出檔案 下一批並行執行的task 複用之前已有的輸出檔案  大大減少了map端的輸出檔案也減少了stage1階段task任務的拉取檔案數大大減少了磁碟的IO,實際生產環節 可能減少的時間是可以縮短一半的

調節map端記憶體緩衝與reduce端記憶體佔比

map端記憶體緩衝預設32K 如果task處理數量較大 需要頻繁的將記憶體溢寫到磁碟

reduce端記憶體使用自己executorJVM 堆空間q 分配是預設 0.2 當拉取的資料過多  

頻繁的spill操作 溢寫到磁碟 只有聚合的時候還的多次讀寫存在磁碟的資料進行聚合

引數檢視: standalone spark UI :4040 stage  executor task shuffle read  writer

yarn  介面 application  spark UI ...

實現設定: .set("spark.shuffle.file.buffer"."32") ,調節 64 、128 成倍加

.set("spark.shuffle.memoryFraction""0.2")調節0.1 的加

調節之後,減少了記憶體緩衝區溢寫的次數 ,也減少了聚合讀取磁碟檔案的數量 

HashShuffleManagerSortShuffleManager

spark.shuffle.managerhashsorttungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash") 普通的規則

new SparkConf().set("spark.shuffle.manager", "tungsten-sort")鎢絲,官方說是優化了記憶體機制