spark(二)優化思路
優化思路
內存優化
內存優化大概分為三個方向
1.所有對象的總內存(包括數據和java對象)
2.訪問這些對象的開銷
3.垃圾回收的開銷
其中Java的原生對象往往都能被很快的訪問,但是會多占據2-5倍或更多的內存,有下面4點原因
·每個單獨的java對象都有一個對象頭(16字節),其中包括指向對象的指針(棧->堆),如果該對象只有幾個屬性,那麽對象頭可能比實際數據占用的空間都大(嚴重浪費資源)
·java每個string都包含了40字節的額外開銷(因為底層其實是存儲在數組,需要記錄數組的指針,長度等信息),每個字符包含2字節(UTF-16編碼)。例如一個10字符的string,實際占用內存空間60字節
·常見的集合類,例如linkedlist,hashmap都有用到鏈表,其中的對象頭,元素指針都會占據額外的空間
·基礎類型的包裝,例如Integer
明確內存消耗
一般情況,可以把數據轉成rdd,然後通過spark自帶的UI的Storage頁面來觀察rdd占用內存大小。
其它特殊的對象,可以用spark自帶的工具類SizeEstimator來評估內存大小,包括對廣播數據的內存占用評估
優化數據結構
避免使用需要額外開銷的java原生的數據結構,比如鏈表,hashmap,包裝類。下面是常見的方法
·盡量使用數組結構和基礎類型,
·在嵌套的數據結構中,盡量避免小對象和指針
·考慮用數字或者枚舉來代替string作為key
·如果內存少於32GB,可以優化JVM -XX:+UseCompressedOops ,OOP = “ordinary object pointer” 普通對象指針。可以讓指針由8->4字節,壓縮的一般是對象的相關指針(不是用來壓縮數據的)。
一般建議場景是在分配給JVM的內存大小在[4G,32G], 如果小於4G,那麽JVM會使用低虛擬地址空間(low virutal address space,64位下模擬32位),這樣就不需要做壓解壓動作了。而對於大於32G,將采用默認的隨機地址分配特性,進行壓解壓。
數據序列化
選擇合適的序列化協議,一般而言用Kryo,比java原生序列化快很多
數據存儲
RDD persistence API
通過把數據序列化至內存,或者磁盤,或者其他策略
GC優化
所有給spark的內存資源,有一部分是用於cache RDD的,剩下的用於jvm的堆和棧等使用。
默認的比例是cache RDD占總內存的60%,可以通過spark.storage.memoryFraction來更改。
一般情況下,官方文檔建議這個比值不要超過JVM Old Gen區域的比值。這也很容易理解,因為RDD Cache數據通常都是長期駐留內存的,理論上也就是說最終會被轉移到Old Gen區域(如果該RDD還沒有被刪除的話),如果這部分數據允許的尺寸太大,勢必把Old Gen區域占滿,造成頻繁的FULL GC,這種情況就可以調小該值。
·確認資源是否給足driver的cpu和memory,executor的cpu和memory
·當出現過多的full GC時候,可以減小RDD cache的內存空間
·當出現過多的minor GC時候,可以增加JVM中Eden區的大小,通過4/3的比例增加
·其它常規JVM優化方法,線程棧的內存大小,永久代的堆內存大小等
分區數優化
分區數多就是task多,整個任務的並發度就高,但也不是越多越好,假設你有100條數據,有50個分區,平均一個分區就處理兩條數據,這樣就造成了嚴重的浪費,更多的時間浪費在分區間的shuffle,和driver的聚合。
下面是幾個優化建議
·每個cpu core上跑2~3個tasks
·當task上的數據大於20KB的時候,可以考慮
·在當前的分區數的1.5倍來進行調優
關於分區
除了顯示的聲明rdd或者dataframe的分區數外,還有兩種控制分區數的配置,
1.spark.sql.shuffle.partitions
針對dataframe和一些sql操作的分區數
默認的分區數為父RDD的最大分區數
2.spark.default.parallelism
針對rdd的默認分區數
一般分區數取決於executor的core數量,因為partition越多task越多,而task是spark的最小處理單元。executor的core數量不夠,task再多也只能排隊,反而慢了。
註:默認的shuffle後分區數為200
共享變量
廣播
當遇到全局性的數據需要使用時,可以采用廣播的方式
廣播變量的優勢:是因為不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變量產生的副本大大減少。
廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其他節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,此後這個executor上的task,都會直接使用本地的BlockManager中的副本。
例如,50個executor,1000個task。一個map,10M。默認情況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集群中,耗費10G的內存資源。如果使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的bockmanager上拉取變量副本,網絡傳輸速度大大增加;500M,大大降低了內存消耗。
累加器
全局的累加器,可以用於統計全局性的數據
數據本地化
數據本地化是一個影響spark jobs性能的主要方面。其實運行分為兩塊,一塊數據,一塊代碼,最好的情況就是數據不動(數據量太大),代碼會部署在各個executor上。
可以通過調節spark.locality相關配置來決定任務的運行選擇。
接口優化
1.reduceBy和groupBy
同理,reduceByKey,aggregateByKey,groupByKey等
優先使用reduceBy
reduceBy會優先合並本地的rdd,這樣就大大的減少了shuffle的數據量
2.coalesce和repartition
看源碼可知repartition是采用shuffle的coalesce。從性能上來講,coalesce是本地合並,也就是同一個executor合並,這樣可以減少網絡傳輸帶來的性能損失,並且是窄依賴,數據恢復也方便。而reparation直接采用shuffle的方式合並。優先使用coalesce。但是在大量分區需要合並的時候,要考慮一下策略。比如,現在一共有1000個分區,需要合並成10個分區。
如果直接采用coalesce(10),可能導致合並的速度並不快(原因未知),而采用reparation(10)並發度會多很多。最終性能還是repartition好一點。
應用場景,設原rdd分區大小為M,現rdd分區大小為N
M>N,並且差10倍以內,考慮用coalesce
M>N,並且差10倍以上,考慮用repartition
M<N,考慮用repartition
註:也可以采用混合使用,先coalesce,把分區數降下來,然後采用repartition,當然這就需要實際測試,觀察哪種性能更加
動態分配資源
Spark on yarn支持一種特殊的資源分配機制
從spark1.2開始就提供該機制。你的application在運行過程中會返回給資源池你所擁有的資源(比如你問yarn要了2G,跑玩數據預處理後,接下來的計算只需要1G,那剩下的1G就先還給yarn)
可以通過下面配置開啟
spark.dynamicAllocation.enabled=true
參考資料
//官方配置文檔
http://spark.apache.org/docs/1.5.0/configuration.html
//spark官方提供的思路
https://spark.apache.org/docs/1.5.0/tuning.html
//cloudera提供的思路
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
//IBM提供的相關資料
https://www.ibm.com/support/knowledgecenter/en/SSZU2E_2.2.0/performance_tuning/application_overview.html
spark(二)優化思路