1. 程式人生 > >Spark調參優化

Spark調參優化

  前幾節介紹了下常用的函式和常踩的坑以及如何打包程式,現在來說下如何調參優化。當我們開發完一個專案,測試完成後,就要提交到伺服器上執行,但在執行中老是丟擲如下異常,這就很納悶了呀,明明測試上沒問題,咋一到線上就出bug了呢!別急,我們來看下這bug到底怎麼回事~

一、錯誤分析

  1、引數設定及異常資訊

18/10/08 16:23:51 WARN TransportChannelHandler: Exception in connection from /10.200.2.95:40888
java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
 at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
 at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
 at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
 at java.lang.Thread.run(Thread.java:745)
18/10/08 16:23:51 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.200.2.95:40888 is closed

  2、分析原因

  執行的程式其實邏輯上比較簡單,只是從hive表裡讀取的資料量很大,差不多60+G,並且需要將某些hive表讀取到dirver節點上,用來獲取每個executor上某些資料的對映值,所以driver設定的資源較大。執行時丟擲的異常資訊,從網上查了下原因大致是伺服器的併發連線數超過了其承載量,伺服器會將其中一些連線Down掉,這也就是說在執行spark程式時,過多的申請資源併發執行。那應該怎樣去合理設定引數才能最大化提升併發的效能呢?這些引數又分別代表什麼?

二、常見引數及含義

常用引數

含義及建議

spark.executor.memory

(--executor-memory)

含義:每個執行器程序分配的記憶體

建議:該設定需要和下面的executor-num數一起考慮,一般設定為4G~8G如果和其他人共用佇列,為防止獨佔資源建議memory*num <= 資源佇列最大總記憶體*(1/2~1/3)

spark.executor.num

(--executor-num)

含義:設定用來執行spark作業的執行器程序的個數

建議:一般設定為50~100個,設定太少會導致無法充分利用資源,太多又導致大部分任務分配不到充足的資源

spark.executor.cores

(--executor-cores)

含義:每個執行器程序的cpu core數量,決定了執行器程序併發執行task執行緒的個數,一個core同一時間只能執行一個task執行緒。如果core數量越多,完成自己task越快。

建議:一般設定2~4個,需要和executor-num一起考慮,num*core<=佇列總core*(1/2~1/3)

spark.driver.memory

(--driver-memory)

含義:設定驅動程序的記憶體

建議:driver一般不設定,預設的就可以,不過如果你在程式中需要使用collect運算元拉取rdd到驅動節點上,那就得設定相應的記憶體大小(大於幾十k建議使用廣播變數)

spark.default.parallelism

含義:設定每個stage的預設任務數量

建議:官方建議設定為 executor-num*executor-cores的2~3倍

spark.storage.memoryFraction

含義:預設Executor 60%的記憶體,可以用來儲存持久化的RDD資料

建議:spark作業中,如果有較多rdd需要持久化,該引數可適當提高一些,保證持久化的資料儲存在記憶體中,避免被寫入磁碟影響執行速度。如果shuffle類操作較多,可調低該引數。並且如果由於頻繁的垃圾回收導致執行緩慢,證明執行task的記憶體不夠用,建議調低該引數。

spark.shuffle.memoryFraction

含義:設定shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例

建議:shuffle操作在進行聚合時,如果發現使用的記憶體超出了這個20%的限制,那麼多餘的資料就會溢寫到磁碟檔案中去,此時就會極大地降低效能。結合上一個引數調整。

spark.speculation

含義:設為true時開啟task預測執行機制。當出現較慢的任務時,這種機制會在另外的節點嘗試執行該任務的一個副本。

建議:true,開啟此選項會減少大規模叢集中個別較慢的任務帶來的影響。

spark.storage.blockManagerTimeoutIntervalMs

含義:內部通過超時機制追蹤執行器程序是否存活的閾值。

建議:對於會引發長時間垃圾回收(GC)暫停的作業,需要把這個值調到100秒(100000)以防止失敗。

spark.sql.shuffle.partitions

含義:配置join或者聚合操作shuffle資料時分割槽的數量,預設200

建議:同spark.default.parallelism

 

三、實踐

  通過適當調整以上講到的幾個引數,降低spark.default.parallelism的同時又設定了spark.sql.shuffle.partitions、spark.speculation、spark.storage.blockManagerTimeoutIntervalMs三個引數。由於專案中頻繁的讀取hive表資料,並進行連線操作,所以在shuffle階段增大了partitions。對於woker傾斜,設定spark.speculation=true,把預測不樂觀的節點去掉來保證程式可穩定執行,通過這幾個引數的調整這樣並大大減少了執行時間。