spark調優(一)【spark引數介紹】
1 spark on yarn常用屬性介紹
屬性名 | 預設值 | 屬性說明 |
---|---|---|
spark.yarn.am.memory |
512m | 在客戶端模式(client mode )下,yarn 應用master 使用的記憶體數。在叢集模式(cluster mode )下,使用spark.driver.memory 代替。 |
spark.driver.cores |
1 | 在叢集模式(cluster mode )下,driver 程式使用的核數。在叢集模式(cluster mode )下,driver 程式和master 執行在同一個jvm 中,所以master 控制這個核數。在客戶端模式(client mode spark.yarn.am.cores 控制master 使用的核。 |
spark.yarn.am.cores |
1 | 在客戶端模式(client mode )下,yarn 應用的master 使用的核數。在叢集模式下,使用spark.driver.cores 代替。 |
spark.yarn.am.waitTime |
100ms | 在叢集模式(cluster mode)下,yarn 應用master 等待SparkContext 初始化的時間。在客戶端模式(client mode )下,master 等待driver 連線到它的時間。 |
spark.yarn.submit.file.replication |
3 | 檔案上傳到hdfs 上去的replication 次數 |
spark.yarn.preserve.staging.files |
false |
設定為true 時,在job 結束時,保留staged 檔案;否則刪掉這些檔案。 |
spark.yarn.scheduler.heartbeat.interval-ms |
3000 | Spark 應用master 與yarn resourcemanager 之間的心跳間隔 |
spark.yarn.scheduler.initial-allocation.interval |
200ms | 當存在掛起的容器分配請求時,spark 應用master resourcemanager 的間隔時間。它的大小不能大於spark.yarn.scheduler.heartbeat.interval-ms ,如果掛起的請求還存在,那麼這個時間加倍,直到到達spark.yarn.scheduler.heartbeat.interval-ms 大小。 |
spark.yarn.max.executor.failures |
numExecutors * 2 ,並且不小於3 |
在失敗應用程式之前,executor 失敗的最大次數。 |
spark.executor.instances |
2 | Executors 的個數。這個配置和spark.dynamicAllocation.enabled 不相容。當同時配置這兩個配置時,動態分配關閉,spark.executor.instances 被使用 |
spark.yarn.executor.memoryOverhead |
executorMemory * 0.10 ,並且不小於384m |
每個executor 分配的堆外記憶體。 |
spark.yarn.driver.memoryOverhead |
driverMemory * 0.10 ,並且不小於384m |
在叢集模式下,每個driver 分配的堆外記憶體。 |
spark.yarn.am.memoryOverhead |
AM memory * 0.10 ,並且不小於384m |
在客戶端模式下,每個driver 分配的堆外記憶體 |
spark.yarn.am.port |
隨機 | Yarn 應用master 監聽的埠。 |
spark.yarn.queue |
default |
應用提交的yarn 佇列的名稱 |
spark.yarn.jar |
none |
Jar 檔案存放的地方。預設情況下,spark jar 安裝在本地,但是jar 也可以放在hdfs 上,其他機器也可以共享。 |
2 客戶端模式和叢集模式的區別
這裡我們要區分一下什麼是客戶端模式(client mode
),什麼是叢集模式(cluster mode
)。
我們知道,當在YARN
上執行Spark
作業時,每個Spark executor
作為一個YARN
容器(container
)執行。Spark
可以使得多個Tasks
在同一個容器(container
)裡面執行。
yarn-cluster
和yarn-client
模式的區別其實就是Application Master
程序的區別,在yarn-cluster
模式下,driver
執行在AM
(Application Master
)中,它負責向YARN
申請資源,並監督作業的執行狀況。當用戶提交了作業之後,就可以關掉Client
,作業會繼續在YARN
上執行。然而yarn-cluster
模式不適合執行互動型別的作業。
在yarn-client
模式下,Application Master
僅僅向YARN
請求executor
,client
會和請求的container
通訊來排程他們工作,也就是說Client
不能離開。下面的圖形象表示了兩者的區別。
2.1 Spark on YARN叢集模式分析
2.1.1 客戶端操作
1、根據
yarnConf
來初始化yarnClient
,並啟動yarnClient
;2、建立客戶端
Application
,並獲取Application
的ID
,進一步判斷叢集中的資源是否滿足executor
和ApplicationMaster
申請的資源,如果不滿足則丟擲IllegalArgumentException
;3、設定資源、環境變數:其中包括了設定
Application
的Staging
目錄、準備本地資源(jar
檔案、log4j.properties
)、設定Application
其中的環境變數、建立Container
啟動的Context
等;4、設定
Application
提交的Context
,包括設定應用的名字、佇列、AM
的申請的Container
、標記該作業的型別為Spark
;5、申請
Memory
,並最終通過yarnClient.submitApplication
向ResourceManager
提交該Application
。
當作業提交到YARN
上之後,客戶端就沒事了,甚至在終端關掉那個程序也沒事,因為整個作業執行在YARN
叢集上進行,執行的結果將會儲存到HDFS
或者日誌中。
2.1.2 提交到YARN叢集,YARN操作
1、執行
ApplicationMaster
的run
方法;2、設定好相關的環境變數。
3、建立
amClient
,並啟動;4、在
Spark UI
啟動之前設定Spark UI
的AmIpFilter
;5、在
startUserClass
函式專門啟動了一個執行緒(名稱為Driver
的執行緒)來啟動使用者提交的Application
,也就是啟動了Driver
。在Driver
中將會初始化SparkContext
;6、等待
SparkContext
初始化完成,最多等待spark.yarn.applicationMaster.waitTries
次數(預設為10),如果等待了的次數超過了配置的,程式將會退出;否則用SparkContext
初始化yarnAllocator
;7、當
SparkContext、Driver
初始化完成的時候,通過amClient
向ResourceManager
註冊ApplicationMaster
;8、分配並啟動
Executeors
。在啟動Executeors
之前,先要通過yarnAllocator
獲取到numExecutors
個Container
,然後在Container
中啟動Executeors
。
如果在啟動Executeors
的過程中失敗的次數達到了maxNumExecutorFailures
的次數,maxNumExecutorFailures
的計算規則如下:
// 預設為numExecutors * 2,最少為3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
那麼這個Application
將失敗,將Application Status
標明為FAILED
,並將關閉SparkContext
。其實,啟動Executeors
是通過ExecutorRunnable
實現的,而ExecutorRunnable
內部是啟動CoarseGrainedExecutorBackend
的。
- 9、最後,
Task
將在CoarseGrainedExecutorBackend
裡面執行,然後執行狀況會通過Akka
通知CoarseGrainedScheduler
,直到作業執行完成。
2.2 Spark on YARN客戶端模式分析
和yarn-cluster
模式一樣,整個程式也是通過spark-submit
指令碼提交的。但是yarn-client
作業程式的執行不需要通過Client
類來封裝啟動,而是直接通過反射機制呼叫作業的main
函式。下面是流程。
1、通過
SparkSubmit
類的launch
的函式直接呼叫作業的main
函式(通過反射機制實現),如果是叢集模式就會呼叫Client
的main
函式。2、而應用程式的
main
函式一定都有個SparkContent
,並對其進行初始化;3、在
SparkContent
初始化中將會依次做如下的事情:設定相關的配置、註冊MapOutputTracker、BlockManagerMaster、BlockManager
,建立taskScheduler
和dagScheduler
;4、初始化完
taskScheduler
後,將建立dagScheduler
,然後通過taskScheduler.start()
啟動taskScheduler
,而在taskScheduler
啟動的過程中也會呼叫SchedulerBackend
的start
方法。
在SchedulerBackend
啟動的過程中將會初始化一些引數,封裝在ClientArguments
中,並將封裝好的ClientArguments
傳進Client
類中,並client.runApp()
方法獲取Application ID
。5、
client.runApp
裡面的做的和上章客戶端進行操作那節類似,不同的是在裡面啟動是ExecutorLauncher
(yarn-cluster
模式啟動的是ApplicationMaster
)。6、在
ExecutorLauncher
裡面會初始化並啟動amClient
,然後向ApplicationMaster
註冊該Application
。註冊完之後將會等待driver
的啟動,當driver
啟動完之後,會建立一個MonitorActor
物件用於和CoarseGrainedSchedulerBackend
進行通訊(只有事件AddWebUIFilter
他們之間才通訊,Task
的執行狀況不是通過它和CoarseGrainedSchedulerBackend
通訊的)。
然後就是設定addAmIpFilter
,當作業完成的時候,ExecutorLauncher
將通過amClient
設定Application
的狀態為FinalApplicationStatus.SUCCEEDED
。7、分配
Executors
,這裡面的分配邏輯和yarn-cluster
裡面類似。8、最後,
Task
將在CoarseGrainedExecutorBackend
裡面執行,然後執行狀況會通過Akka
通知CoarseGrainedScheduler
,直到作業執行完成。9、在作業執行的時候,
YarnClientSchedulerBackend
會每隔1秒通過client
獲取到作業的執行狀況,並打印出相應的執行資訊,當Application
的狀態是FINISHED、FAILED
和KILLED
中的一種,那麼程式將退出等待。10、最後有個執行緒會再次確認
Application
的狀態,當Application
的狀態是FINISHED、FAILED
和KILLED
中的一種,程式就執行完成,並停止SparkContext
。整個過程就結束了。
3 spark submit 和 spark shell引數介紹
引數名 | 格式 | 引數說明 |
---|---|---|
–master | MASTER_URL | 如spark://host:port |
–deploy-mode | DEPLOY_MODE | Client或者master,預設是client |
–class | CLASS_NAME | 應用程式的主類 |
–name | NAME | 應用程式的名稱 |
–jars | JARS | 逗號分隔的本地jar包,包含在driver和executor的classpath下 |
–packages | 包含在driver和executor的classpath下的jar包逗號分隔的”groupId:artifactId:version”列表 | |
–exclude-packages | 用逗號分隔的”groupId:artifactId”列表 | |
–repositories | 逗號分隔的遠端倉庫 | |
–py-files | PY_FILES | 逗號分隔的”.zip”,”.egg”或者“.py”檔案,這些檔案放在python app的PYTHONPATH下面 |
–files | FILES | 逗號分隔的檔案,這些檔案放在每個executor的工作目錄下面 |
–conf | PROP=VALUE | 固定的spark配置屬性 |
–properties-file | FILE | 載入額外屬性的檔案 |
–driver-memory | MEM | Driver記憶體,預設1G |
–driver-java-options | 傳給driver的額外的Java選項 | |
–driver-library-path | 傳給driver的額外的庫路徑 | |
–driver-class-path | 傳給driver的額外的類路徑 | |
–executor-memory | MEM | 每個executor的記憶體,預設是1G |
–proxy-user | NAME | 模擬提交應用程式的使用者 |
–driver-cores | NUM | Driver的核數,預設是1。這個引數僅僅在standalone叢集deploy模式下使用 |
–supervise | Driver失敗時,重啟driver。在mesos或者standalone下使用 | |
–verbose | 列印debug資訊 | |
–total-executor-cores | NUM | 所有executor總共的核數。僅僅在mesos或者standalone下使用 |
–executor-core | NUM | 每個executor的核數。在yarn或者standalone下使用 |
–driver-cores | NUM | Driver的核數,預設是1。在yarn叢集模式下使用 |
–queue | QUEUE_NAME | 佇列名稱。在yarn下使用 |
–num-executors | NUM | 啟動的executor數量。預設為2。在yarn下使用 |
你可以通過spark-submit --help
或者spark-shell --help
來檢視這些引數。