1. 程式人生 > 其它 >Spark排錯與優化

Spark排錯與優化

一. 運維

1. Master掛掉,standby重啟也失效

Master預設使用512M記憶體,當叢集中執行的任務特別多時,就會掛掉,原因是master會讀取每個task的event log日誌去生成Sparkui,記憶體不足自然會OOM,可以在master的執行日誌中看到,通過HA啟動的master自然也會因為這個原因失敗。

解決

  1. 增加Master的記憶體佔用,在Master節點spark-env.sh 中設定:

    export SPARK_DAEMON_MEMORY 10g # 根據你的實際情況
    
  2. 減少儲存在Master記憶體中的作業資訊

    spark.ui.retainedJobs 500   # 預設都是1000
    spark.ui.retainedStages 500
    

2. worker掛掉或假死

有時候我們還會在web ui中看到worker節點消失或處於dead狀態,在該節點執行的任務則會報各種 lost worker 的錯誤,引發原因和上述大體相同,worker記憶體中儲存了大量的ui資訊導致gc時失去和master之間的心跳。

解決

  1. 增加Master的記憶體佔用,在Worker節點spark-env.sh 中設定:

    export SPARK_DAEMON_MEMORY 2g # 根據你的實際情況
    
  2. 減少儲存在Worker記憶體中的Driver,Executor資訊

    spark.worker.ui.retainedExecutors 200   # 預設都是1000
    spark.worker.ui.retainedDrivers 200   
    

二. 執行錯誤

1.shuffle FetchFailedException

Spark Shuffle FetchFailedException解決方案

錯誤提示

  1. missing output location

    org.apache.spark.shuffle.MetadataFetchFailedException: 
    Missing an output location for shuffle 0
    
  2. shuffle fetch faild

    org.apache.spark.shuffle.FetchFailedException:
    Failed to connect to spark047215/192.168.47.215:50268
    

    當前的配置為每個executor使用1core,5GRAM,啟動了20個executor

解決

這種問題一般發生在有大量shuffle操作的時候,task不斷的failed,然後又重執行,一直迴圈下去,直到application失敗。

一般遇到這種問題提高executor記憶體即可,同時增加每個executor的cpu,這樣不會減少task並行度。

  • spark.executor.memory 15G
  • spark.executor.cores 3
  • spark.cores.max 21

啟動的execuote數量為:7個

execuoterNum = spark.cores.max/spark.executor.cores 

每個executor的配置:

3core,15G RAM

消耗的記憶體資源為:105G RAM

15G*7=105G

可以發現使用的資源並沒有提升,但是同樣的任務原來的配置跑幾個小時還在卡著,改了配置後幾分鐘就能完成。

2.Executor&Task Lost

錯誤提示

  1. executor lost

    WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
    ExecutorLostFailure (executor lost)
    
  2. task lost

    WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217):
    java.io.IOException: Connection from /192.168.47.217:55483 closed
    
  3. 各種timeout

    java.util.concurrent.TimeoutException: Futures timed out after [120 second]
    
    ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 
    has been quiet for 120000 ms while there are outstanding requests.
    Assuming connection is dead; please adjust spark.network.
    timeout if this is wrong
    

解決

由網路或者gc引起,worker或executor沒有接收到executor或task的心跳反饋。 
提高 spark.network.timeout 的值,根據情況改成300(5min)或更高。 
預設為 120(120s),配置所有網路傳輸的延時,如果沒有主動設定以下引數,預設覆蓋其屬性

  • spark.core.connection.ack.wait.timeout
  • spark.akka.timeout
  • spark.storage.blockManagerSlaveTimeoutMs
  • spark.shuffle.io.connectionTimeout
  • spark.rpc.askTimeout or spark.rpc.lookupTimeout

3.傾斜

錯誤提示

  1. 資料傾斜

  2. 任務傾斜 
    差距不大的幾個task,有的執行速度特別慢。

解決

大多數任務都完成了,還有那麼一兩個任務怎麼都跑不完或者跑的很慢,分為資料傾斜和task傾斜兩種。

  1. 資料傾斜 
    資料傾斜大多數情況是由於大量的無效資料引起,比如null或者”“,也有可能是一些異常資料,比如統計使用者登入情況時,出現某使用者登入過千萬次的情況,無效資料在計算前需要過濾掉。 
    資料處理有一個原則,多使用filter,這樣你真正需要分析的資料量就越少,處理速度就越快。

    sqlContext.sql("...where col is not null and col != ''")
    

    具體可參考: 
    解決spark中遇到的資料傾斜問題

  2. 任務傾斜 
    task傾斜原因比較多,網路io,cpu,mem都有可能造成這個節點上的任務執行緩慢,可以去看該節點的效能監控來分析原因。以前遇到過同事在spark的一臺worker上跑R的任務導致該節點spark task執行緩慢。 
    或者可以開啟spark的推測機制,開啟推測機制後如果某一臺機器的幾個task特別慢,推測機制會將任務分配到其他機器執行,最後Spark會選取最快的作為最終結果。

    • spark.speculation true
    • spark.speculation.interval 100 - 檢測週期,單位毫秒;
    • spark.speculation.quantile 0.75 - 完成task的百分比時啟動推測
    • spark.speculation.multiplier 1.5 - 比其他的慢多少倍時啟動推測。

4.OOM

錯誤提示

堆記憶體溢位

java.lang.OutOfMemoryError: Java heap space

解決

記憶體不夠,資料太多就會丟擲OOM的Exeception,主要有driver OOM和executor OOM兩種

  1. driver OOM 
    一般是使用了collect操作將所有executor的資料聚合到driver導致。儘量不要使用collect操作即可。

  2. executor OOM 
    可以按下面的記憶體優化的方法增加code使用記憶體空間

    • 增加executor記憶體總量,也就是說增加spark.executor.memory的值
    • 增加任務並行度(大任務就被分成小任務了),參考下面優化並行度的方法

5.task not serializable

錯誤提示

org.apache.spark.SparkException: Job aborted due to stage failure: 
Task not serializable: java.io.NotSerializableException: ...

解決

如果你在worker中呼叫了driver中定義的一些變數,Spark就會將這些變數傳遞給Worker,這些變數並沒有被序列化,所以就會看到如上提示的錯誤了。

val x = new X()  //在driver中定義的變數
dd.map{r => x.doSomething(r) }.collect  //map中的程式碼在worker(executor)中執行

除了上文的map,還有filter,foreach,foreachPartition等操作,還有一個典型例子就是在foreachPartition中使用資料庫建立連線方法。這些變數沒有序列化導致的任務報錯。

下面提供三種解決方法:

  1. 將所有呼叫到的外部變數直接放入到以上所說的這些運算元中,這種情況最好使用foreachPartition減少建立變數的消耗。
  2. 將需要使用的外部變數包括sparkConf,SparkContext,都用 @transent進行註解,表示這些變數不需要被序列化
  3. 將外部變數放到某個class中對類進行序列化。

6.driver.maxResultSize太小

錯誤提示

Caused by: org.apache.spark.SparkException:
 Job aborted due to stage failure: Total size of serialized 
 results of 374 tasks (1026.0 MB) is bigger than
  spark.driver.maxResultSize (1024.0 MB)

解決

spark.driver.maxResultSize預設大小為1G 每個Spark action(如collect)所有分割槽的序列化結果的總大小限制,簡而言之就是executor給driver返回的結果過大,報這個錯說明需要提高這個值或者避免使用類似的方法,比如countByValue,countByKey等。

將值調大即可

spark.driver.maxResultSize 2g

7.taskSet too large

錯誤提示

WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.

這個WARN可能還會導致ERROR

Caused by: java.lang.RuntimeException: Failed to commit task

Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit

解決

如果你比較瞭解spark中的stage是如何劃分的,這個問題就比較簡單了。 
一個Stage中包含的task過大,一般由於你的transform過程太長,因此driver給executor分發的task就會變的很大。 
所以解決這個問題我們可以通過拆分stage解決。也就是在執行過程中呼叫cache.count快取一些中間資料從而切斷過長的stage。

8. driver did not authorize commit

driver did not authorize commit

9. 環境報錯

  1. driver節點記憶體不足 
    driver記憶體不足導致無法啟動application,將driver分配到記憶體足夠的機器上或減少driver-memory

    Java HotSpot(TM) 64-Bit Server VM warning: INFO:
    

    os::commit_memory(0x0000000680000000, 4294967296, 0) failed; 
    error=’Cannot allocate memory’ (errno=12)

  2. hdfs空間不夠 
    hdfs空間不足,event_log無法寫入,所以 ListenerBus會報錯 ,增加hdfs空間(刪除無用資料或增加節點)

    Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
     File /tmp/spark-history/app-20151228095652-0072.inprogress 
     could only be replicated to 0 nodes instead of minReplication (=1)
    
    ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
    java.lang.reflect.InvocationTargetException
    
  3. spark編譯包與Hadoop版本不一致 
    下載對應hadoop版本的spark包或自己編譯。

    java.io.InvalidClassException: org.apache.spark.rdd.RDD;
     local class incompatible: stream classdesc serialVersionUID
    
  4. driver機器埠使用過多 
    在一臺機器上沒有指定埠的情況下,提交了超過15個任務。

    16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI
    java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
    

    提交任務時指定app web ui埠號解決:

    --conf spark.ui.port=xxxx
    
  5. 中文亂碼

    使用write.csv等方法寫出到hdfs的檔案,中文亂碼。JVM使用的字符集如果沒有指定,預設會使用系統的字符集,因為各個節點系統字符集並不都是UTF8導致,所以會出現這個問題。直接給JVM指定字符集即可。

    spark-defaults.conf

    spark.executor.extraJavaOptions -Dfile.encoding=UTF-8
    

三. 一些python錯誤

1.python版本過低

java.io.UIException: Cannot run program "python2.7": error=2,沒有那個檔案或目錄

spark使用的Python版本為2.7,centOS預設python版本為2.6,升級即可。

2.python許可權不夠

錯誤提示

部分節點上有錯誤提示

java.io.IOExeception: Cannot run program "python2.7": error=13, 許可權不夠

解決

新加的節點運維裝2.7版本的python,python命令是正確的,python2.7卻無法呼叫,只要改改環境變數就好了。

3.pickle使用失敗

錯誤提示

TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)',
 <type 'sklearn.tree._tree.Tree'>, (10, array([1], dtype=int32), 1,
  <sklearn.tree._tree.RegressionCriterion object at 0x100077480>,
   50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>))

解決

該pickle檔案是在0.17版本的scikit-learn下訓練出來的,有些機器裝的是0.14版本,版本不一致導致,升級可解決,記得將老版本資料清理乾淨,否則會報各種Cannot import xxx的錯誤。

4.python編碼錯誤

錯誤提示

UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)

解決

方法1:

import sys  
reload(sys)  
sys.setdefaultencoding('utf-8')

方法2:

//報錯
str(u'中國')
//不報錯
str(u'中國'.encode('utf-8'))

四. 一些優化

1. 部分Executor不執行任務

有時候會發現部分executor並沒有在執行任務,為什麼呢?

(1) 任務partition數過少, 
要知道每個partition只會在一個task上執行任務。改變分割槽數,可以通過 repartition 方法,即使這樣,在 repartition 前還是要從資料來源讀取資料,此時(讀入資料時)的併發度根據不同的資料來源受到不同限制,常用的大概有以下幾種:

hdfs - block數就是partition數
mysql - 按讀入時的分割槽規則分partition
es - 分割槽數即為 es 的 分片數(shard)

(2) 資料本地性的副作用

taskSetManager在分發任務之前會先計算資料本地性,優先順序依次是:

process(同一個executor) -> node_local(同一個節點) -> rack_local(同一個機架) -> any(任何節點)

Spark會優先執行高優先順序的任務,任務完成的速度很快(小於設定的spark.locality.wait時間),則資料本地性下一級別的任務則一直不會啟動,這就是Spark的延時排程機制。

舉個極端例子:執行一個count任務,如果資料全都堆積在某一臺節點上,那將只會有這臺機器在長期計算任務,叢集中的其他機器則會處於等待狀態(等待本地性降級)而不執行任務,造成了大量的資源浪費。

判斷的公式為:

curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)

其中 curTime 為系統當前時間,lastLaunchTime 為在某優先順序下最後一次啟動task的時間

如果滿足這個條件則會進入下一個優先順序的時間判斷,直到 any,不滿足則分配當前優先順序的任務。

資料本地性任務分配的原始碼在 taskSetManager.Scala 。

如果存在大量executor處於等待狀態,可以降低以下引數的值(也可以設定為0),預設都是3s。

spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

當你資料本地性很差,可適當提高上述值,當然也可以直接在叢集中對資料進行balance。

2. spark task 連續重試失敗

有可能哪臺worker節點出現了故障,task執行失敗後會在該 executor 上不斷重試,達到最大重試次數後會導致整個 application執行失敗,我們可以設定失敗黑名單(task在該節點執行失敗後會換節點重試),可以看到在原始碼中預設設定的是 0,

private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
    conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)  
  • 1
  • 2
  • 1
  • 2

在 spark-default.sh 中設定

spark.scheduler.executorTaskBlacklistTime 30000

當 task 在該 executor 執行失敗後會在其它 executor 中啟動,同時此 executor 會進入黑名單30s(不會分發任務到該executor)。

3. 記憶體

如果你的任務shuffle量特別大,同時rdd快取比較少可以更改下面的引數進一步提高任務執行速度。

spark.storage.memoryFraction - 分配給rdd快取的比例,預設為0.6(60%),如果快取的資料較少可以降低該值。 
spark.shuffle.memoryFraction - 分配給shuffle資料的記憶體比例,預設為0.2(20%) 
剩下的20%記憶體空間則是分配給程式碼生成物件等。

如果任務執行緩慢,jvm進行頻繁gc或者記憶體空間不足,或者可以降低上述的兩個值。 
"spark.rdd.compress","true" - 預設為false,壓縮序列化的RDD分割槽,消耗一些cpu減少空間的使用

4. 併發

mysql讀取併發度優化

spark.default.parallelism 
發生shuffle時的並行度,在standalone模式下的數量預設為core的個數,也可手動調整,數量設定太大會造成很多小任務,增加啟動任務的開銷,太小,執行大資料量的任務時速度緩慢。

spark.sql.shuffle.partitions 
sql聚合操作(發生shuffle)時的並行度,預設為200,如果該值太小會導致OOM,executor丟失,任務執行時間過長的問題

相同的兩個任務: 
spark.sql.shuffle.partitions=300:

spark.sql.shuffle.partitions=500:

速度變快主要是大量的減少了gc的時間。

但是設定過大會造成效能惡化,過多的碎片task會造成大量無謂的啟動關閉task開銷,還有可能導致某些task hang住無法執行。

修改map階段並行度主要是在程式碼中使用rdd.repartition(partitionNum)來操作。

5. shuffle

spark-sql join優化 
map-side-join 關聯優化

6. 磁碟

磁碟IO優化

7.序列化

kryo Serialization

8.資料本地性

Spark不同Cluster Manager下的資料本地性表現 
spark讀取hdfs資料本地性異常

9.程式碼

編寫Spark程式的幾個優化點