spark standalone 讀取 HDFS 資料本地性異常
在分散式計算中,為了提高計算速度,資料本地性是其中重要的一環。
不過有時候它同樣也會帶來一些問題。
一.問題描述
在分散式計算中,大多數情況下要做到移動計算而非移動資料,所以資料本地性尤其重要,因此我們往往也是將hdfs和spark部署在相同的節點上,有些人可能會發現即使他已經這麼做了,在spark的任務中的locality還是ANY,這說明所有的資料都是走的網路IO。
在沒有沒有shuffle的情況下,僅在資料讀取階段網路IO佔用都很嚴重,可以看下ganglia的監控,最高峰出現在讀取資料階段
後來發現slave的標識都是ip
二.解決方案
而hdfs以hostname作為slave標示,所以改變啟動slave的方式
start-slave.sh -h <hostname> <master>
啟動後
再執行任務就變成了NODE_LOCAL,效率有了極大的提升
三.資料本地性的副作用
大多數情況下,資料本地性可以減少網路的IO,提高程式整體的執行效率。不過在一些比較特殊的情況下(Spark的延時排程),他反而會拖累整體執行速度。
taskSetManager在分發任務之前會先計算資料本地性,優先順序依次是:
process(同一個executor) -> node_local(同一個節點) -> rack_local(同一個機架) -> any(任何節點)
Spark會優先執行高優先順序的任務,如果一個task執行的時間很短(小於設定的spark.locality.wait時間),則資料本地性下一級別的任務則一直不會啟動,這就是Spark的延時排程機制。
舉個極端例子:執行一個count任務,如果資料全都堆積在某一臺節點上,那將只會有這臺機器在長期執行任務,叢集中的其他機器則會處於等待狀態(等待本地性降級)而不執行任務,造成了大量的資源浪費。
判斷的公式為:
curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)
其中 curTime
為系統當前時間,lastLaunchTime
為在某優先順序下最後一次啟動task的時間
如果滿足這個條件則會進入下一個優先順序的時間判斷,直到 any
,不滿足則分配當前優先順序的任務。
資料本地性任務分配的原始碼在 taskSetManager.scala
如果存在大量executor處於等待狀態,可以降低以下引數的值(也可以設定為0),預設都是3s。
spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
當你資料本地性很差,可適當提高上述值,當然也可以直接在叢集中對資料進行balance。