Spark調優那些事
Spark調優那些事
在當前的大資料時代,Spark作為一款快入閃電⚡️的計算引擎普遍為大家所熟知和使用,在使用Spark的時候難免會遇到各種各樣的問題,其中大部分原因是資料傾斜,但是我們在對整個Spark進行調優時,最開始可以調整Spark中各種各樣的引數設定和分配,如果大部分引數除錯好之後還不能滿足自己的需求,這時候主要考慮的一定是優化自己的程式碼,這是非常重要的!!!
常用的引數:
num-executors:
這個引數設定了Spark執行時需要多少個Executor程序來執行,當程式放在Yarn上執行時,會啟動相應數量的Executor,這個引數如果不設定,預設的情況下Executor的數量比較少,可以適當的進行調高,但不要過高。平常接觸的業務中最大設定過80,我那時候已經覺得自己很不要臉了~
executor-memory:
該引數設定了每個Executor的記憶體,這個引數的設定會直接影響到許多程式的執行,常見的OOM異常會跟該引數有直接的關係,這個的設定就要取決於自己的記憶體的大小,量力而行,可以用上面的引數*該引數,這個乘積是不能超過最大記憶體的,個人建議設定成最大記憶體的一半左右,如果還有其他人與你一起共用該記憶體,設定過大會影響他人的效能。
driver-memory:
這個引數表示dirver的記憶體大小,預設是1,可以根據情況進行適當的調高,個人建議設定到10-20左右,避免發生OOM。
executor-cores:
這個是Spark中核心配置之一,設定每個Executor的核心CPU的數量,一個Executor的CPU同時只能執行一個任務,這個平常設定都是1-4個,最大的時候是4。
driver-cores:
這個是driver程式的CPU核心數量,預設為1,本人一直都在使用預設,沒有調過,大家可以嘗試下適當調高,之前看別人的文章可以調到8-16。
default-paralleism:
這個引數用來設定每個stage的task數量,在官方中給出的建議是設定在num-executor和executor-cores的乘積的2-3倍左右,我一般設定的數量大概是200-500之間,可以根據num-executor和executor-cores進行適當的調整。
Tips:上面說了一些常用的引數,如果作出適當的調整後還沒有滿足需要,就應該考慮修改自己的業務邏輯了!!!
以上是我在平常使用到的一些引數,其實還有很多,本人也在不斷的學習!!
資料傾斜
大家都知道在Spark任務中,使用了某些運算元,可能會存在Shuffle過程,一但存在Shuffle過程,那就代表會出現數據傾斜的可能,所以資料傾斜指的一般都是在Shuffle過程中的資料傾斜。
產生的原因:
在執行到Shuffle過程中,由於我們的資料中Key的資料量相差較大,導致其中每個Task處理的資料量不同0哦那個,某些任務執行的很快,某些任務執行的很慢甚至是OOM,這就是資料傾斜。其中還有一個概念是資料過量,這兩個概念並不相同,傾斜只是某些key中的資料量很大,過量時全部的Key的資料量都很大。
程式碼部分優化處理
在讀資料的時候,應適當的給資料進行一些分類。
1.需要讀到很多不同位置的資料,保證每個資料的RDD只有一個,在第二次使用時,直接使用該RDD,不要重新讀。
2.對於一些大批量的資料,可以使用Spark中廣播變數的功能,將該資料通過廣播的形式通知到各個Executor中。
3.對於頻繁使用的資料,在讀取轉成RDD後應當選擇一種合適的持久化策略進行持久化,Spark它非常看不起磁碟,覺得它特別垃圾,在預設的持久化策略中使用的是MEMORY_ONLY。Spark中的持久化策略有12種,其中名稱帶2的方式都是將持久化的資料進行復制一份副本,把副本儲存在不同的節點上,以便容錯。
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
4.使用Kryo序列化,之前並沒有接觸過Kryo。在網上學習了一下,大概就是Spark預設其實是Java的序列化,但是也支援Kryo的序列化,在官方說Kryo的速度要比Java的快10倍左右,缺點是,如果程式中涉及了自定義的物件,Kryo要求將自定義的物件進行註冊,這樣才能達到最優,過程比Java的繁瑣些,所以Spark就沒有設定其為預設。
SparkConf conf = new SparkConf().setAppName("test")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{ConnectionDataVO.class});
Tips:在官網解釋說:Java的序列化更加靈活,但是執行速度緩慢,Kryo的速度更快,但是支援的型別較少。
5.對於程式中所使用的資料結構進行優化,可以替換的關係
物件,字串 ----------> 字串
集合 ----------> 陣列
Tips:以上是通過修改一些程式中的設定,以及對資料來源進行優化
首先要定位程式的問題點在哪
在程式中新增可能會出現原因的log資訊並進行列印。
SparkUI介面上可以檢視DAG的執行邏輯和具體某個運算元的執行過程。
1.改變Key的粒度:
分析原有的Key的分佈情況,判斷Key的粒度是否過大,或者過小。比如,按天統計的可以擴大成按周統計的,按月統計的可以細分到按周統計的,這種方式大多針對含有時間區分的Key中,調整後的Key的數量和所對應的資料量肯定不相同,避免了資料傾斜。
2.多次聚合Key:
在業務中可能經過一次groupByKey或者reduceByKey等聚合操作時產生的Key的數量過多,可以多次進行聚合,比如,在第一次聚合時,先在Key的前面加上HashCode或者隨機數,使其本來聚合成一個Key的,在聚合前先小範圍的聚合一次,之後再聚合時,將前面的隨機數等去掉,再聚合。
3.某一個Key資料傾斜:
可以將該Key單獨提出來生成一個RDD,在Saprk中存在一種機制,當某個RDD中只存在一個Key時,在進行Shuffle操作時,會將該Key的所有value進行打散,分配到不同的task中進行處理,處理後,再將資料進行join,這樣可以解決資料傾斜的問題。
4.對於Join操作,替換成Map:
如果程式中存在join運算元,找出相對來說資料量較小的那個RDD,通過Spark中的廣播功能,呼叫broadcast的方法傳遞這個RDD得到一個Broadcast型別的變數,呼叫.value,將這個小份資料廣播到每個Executor中,之後不再呼叫join操作,使用Map,在改運算元中根據某些相關聯的條件將兩個資料進行連線,這個方式對於join操作產生的資料傾斜,效果比較好,因為不會產生Shuffle操作。
5.進行適當的擴容(謹慎使用!):
當程式遇到許多個Key都發生資料傾斜時,可以對剩下正常的Key進行適當的擴容,以分擔資料傾斜Key的資料,這樣的操作也只是緩解當前的資料傾斜,並沒有從根本上解決問題,使用該方法對記憶體的還會有一定的要求,如果使用不恰當可能會出現OOM。