1. 程式人生 > >Spark 本地化計算引數調整分析

Spark 本地化計算引數調整分析

 

Spark在Driver上,對Application的每一個stage的task,進行分配之前,都會計算出每個task要計算的是哪個分片資料,RDD的某個partition;Spark的task分配演算法,優先,會希望每個task正好分配到它要計算的資料所在的節點,這樣的話,就不用在網路間傳輸資料;

     但是呢,通常來說,有時,事與願違,可能task沒有機會分配到它的資料所在的節點,為什麼呢,可能那個節點的計算資源和計算能力都滿了;所以呢,這種時候,通常來說,Spark會等待一段時間,預設情況下是3s鍾(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,比如說,將task分配到靠它要計算的資料所在節點,比較近的一個節點,然後進行計算。

    但是對於第二種情況,通常來說,肯定是要發生資料傳輸,task會通過其所在節點的BlockManager來獲取資料,BlockManager發現自己本地沒有資料,會通過一個getRemote()方法,通過TransferService(網路資料傳輸元件)從資料所在節點的BlockManager中,獲取資料,通過網路傳輸回task所在節點。
 

 

 

在Spark中資料的本地化分為5種

 

本地化級別

1、PROCESS_LOCAL : 程序本地化,指task計算的資料在本程序(Executor)中

2、NODE_LOCAL:節點本地化,指task計算的資料在本節點(node)的磁碟上,當task在本程序中一直沒有執行(如果Driver分發task 3s後沒有執行,且重複5次後),此時Driver就把這個沒有執行的task傳送到本節點的其他executor中執行

3、NO_PREF:沒有本地化這一說,無需本地化,如計算所需的資料在關係型資料中(MySQL或Oracle),node1節點中的MySQL,可以被node2或node3節點連線使用。

4、RACK_LOCAL:task計算的資料是在本機架的其他節點上

5、ANY:隨機,任何地方都可以

優先順序依次是1到5,逐漸降低
 

引數調整

通過 SparkConf 進行調整:

 

new SparkConf() 
.set(“spark.locality.wait”, “10”)

 

 

預設值: spark.locality.wait,預設為3s

 

PROCESS_LOCAL : spark.locality.wait.process   預設為3s

NODE_LOCAL:spark.locality.wait.node

RACK_LOCAL: spark.locality.wait.rack

 

 

觀察大部分task的資料本地化級別 
如果大多都是PROCESS_LOCAL,那就不用調節了

如果是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下資料本地化的等待時長。

調節完,應該是要反覆調節,每次調節完以後,再來執行,觀察日誌 
看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的執行時間有沒有縮短。
實踐出真理!!!

圖例:

 

 

獲取本地化級別時間 對應程式碼:
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {

  // 預設等待時間,取自引數spark.locality.wait,預設為3s
  val defaultWait = conf.get(config.LOCALITY_WAIT)

  // 根據不同的TaskLocality,取不同的引數,設定TaskLocality等待時間
  // PROCESS_LOCAL取引數spark.locality.wait.process
  // NODE_LOCAL取引數spark.locality.wait.node
  // RACK_LOCAL取引數spark.locality.wait.rack

  val localityWaitKey = level match {
    case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
    case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
    case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
    case _ => null
  }

  if (localityWaitKey != null) {
    conf.getTimeAsMs(localityWaitKey, defaultWait.toString)
  } else {
    0L
  }
}

 

參考連結:

https://blog.csdn.net/JasonZhangOO/article/details/79350149

https://blog.csdn.net/lxhandlbb/article/details/52987909