spark-core_04: org.apache.spark.deploy.SparkSubmit原始碼分析:
SparkSubmitArgumentsParser的父類就SparkSubmitOptionParser,在launcher.Main方法執行時用到OptionParser 它的父類也是SparkSubmitOptionParser。並且這個父類有一個方法parser。作用將spark-submit放進來的引數對應值賦到spark對應的變數中,如 --class的值 放到mainClass變數中此處在SparkSubmitArguments初始化時也呼叫這個父類的parser方法。
object SparkSubmit {
。。。
private[spark] def printVersionAndExit
printStream.println("""Welcometo
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
printStream.println("Type --helpfor more information.")
exitFn(0)
}
// scalastyle:on println
/** 使用SparkSubmitArguments封裝spark-submit傳入的引數:
* 這是spark-shell傳進來的:“org.apache.spark.deploy.SparkSubmit”
* --class org.apache.spark.repl.Main --name "Spark shell"--master spark://luyl152:7077,luyl153:7077,luyl154:7077
* 這是自己的應用通過spark-submit傳進來的:
* --master spark://luyl152:7077,luyl153:7077,luyl154:7077 --classdt.scala.App /tool/jarDir/maven_scala-1.0-SNAPSHOT.jar
*/
val appArgs = new SparkSubmitArguments(args)
一、查看一下new SparkSubmitArguments(args)中初始化呼叫的parse(args)程式碼
/**
* Parses and encapsulates arguments fromthe spark-submit script.
* The env argument is used for testing.
* sys.env :會將系統中所有環境變數都取出來
*/
private[deploy] class SparkSubmitArguments(args:Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser {
var master: String = null
var deployMode: String = null
var executorMemory: String= null
var executorCores: String= null
var totalExecutorCores: String= null
var propertiesFile: String= null
var driverMemory: String = null
var driverExtraClassPath: String= null
var driverExtraLibraryPath: String= null
var driverExtraJavaOptions: String= null
……// Set parameters from command line arguments
try {
parse(args.asJava) //因為parse()方法引數型別是java的List,所以要轉一下
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
……
===>檢視一下SparkSubmitOptionParser的parse程式碼:
/**
* Parse a list of spark-submit commandline options.
* <p>
* See SparkSubmitArguments.scala for a more formal descriptionof available options.
*
* @throws IllegalArgumentExceptionIf an error is found during parsing
* 引數是這些:--class org.apache.spark.repl.Main --name"Spark shell" --master spark://luyl152:7077.
* 作用就是將spark-submit放進來的引數對應值賦到spark對應的變數中,如 --class的值 放到mainClass變數中
*/
protected finalvoid parse(List<String> args) {
//spark-submit可以傳sparkConf引數:--confPROP=VALUE ,引數可以看org.apache.spark.deploy.SparkSubmitArguments類最後面
//或spark-submit-h就可以檢視Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
int idx = 0;
for (idx = 0; idx < args.size(); idx++) {
String arg = args.get(idx);
String value = null;
//當出現--conf PROP=VALUE這種型別的引數arg、value值變成if程式碼裡面的值
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1); //--conf PROP
value = m.group(2); //VALUE
}
// Look for options with a value.
//該方法主要是找到spark-submit後面的帶有--引數,如args 放進"--class",和opts二維陣列進行匹配
//匹配到的還是返回--class,如果沒有匹配到則nullString name = findCliOption(arg, opts);
if (name != null) {
if (value== null) {
if (idx== args.size() - 1) { //如果匹配了並且沒有引數值則報錯,如:只有 --class ,則size是1,idx此時0, 1-1=0
throw new IllegalArgumentException(
String.format("Missing argument for option'%s'.", arg));
}
idx++;
value = args.get(idx); //如果有值,則idx索引的下一位就是引數對應的值
}
//name就是spark-submit的引數如--class,而value就是引數對應的值
//在它的自身OptionParser做的實現,作用就是將spark-submit放進來的引數對應值賦到spark對應的變數中
//如 --class的值放到mainClass變數中(裡面實現很easy,就不寫了)
if (!handle(name, value)) {
break;
}
continue; //只有匹配到才會讓idx再次加1
}
// Look for aswitch. 如果上面沒有匹配到,會再去匹配一下是否有出現-verbose這樣引數
name = findCliOption(arg, switches);
if (name != null) {
if (!handle(name, null)) {
break;
}
continue;
}
if (!handleUnknown(arg)){
break;
}
}
if (idx< args.size()) {
idx++;
}
// 將多出來的引數加到 SparkSubmitCommandBuilder() {his.sparkArgs = new ArrayList<String>();..}
handleExtraArgs(args.subList(idx, args.size()));
}
===》上面handle(name,value)在OptionParser的實現如下
/**
*作用就是將spark-submit放進來的引數對應值賦到spark對應的變數中
*/
@Override
protected boolean handle(String opt, String value) {
if (opt.equals(MASTER)) {
master =value;
} elseif (opt.equals(DEPLOY_MODE)) {
deployMode =value;
} elseif (opt.equals(PROPERTIES_FILE)) {
propertiesFile = value;
} elseif (opt.equals(DRIVER_MEMORY)) {
conf.put(SparkLauncher.DRIVER_MEMORY, value);
} elseif (opt.equals(DRIVER_JAVA_OPTIONS)) {
conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
} elseif (opt.equals(DRIVER_LIBRARY_PATH)) {
conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);。。。。。
===》初始化new SparkSubmitArguments(args)之後再次返回SparkSubmit$.main方法
//這個verbose只要在spark提交的時候加入:--verbose引數就可以變成true,可以看原始碼實現很easy (可以得到很多資訊如:)
//./spark-submit --classdt.spark.DriverInWinDebuggerCluster --master spark://luyl152:7077,luyl153:7077,luyl154:7077 --verbose /tool/maven_scala-1.0-SNAPSHOT.jarif (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
/**
* Main class:
dt.spark.DriverInWinDebuggerCluster
Arguments:
System properties:
spark.yarn.historyServer.address-> luyl152:18080
spark.eventLog.enabled -> true
SPARK_SUBMIT -> true
spark.executor.extraJavaOptions-> -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.history.fs.logDirectory-> hdfs://ns1/historyserverforspark
spark.app.name ->dt.spark.DriverInWinDebuggerCluster
spark.jars ->file:/tool/maven_scala-1.0-SNAPSHOT.jar
spark.submit.deployMode ->client #預設就是client ,在這個類中搜索--deploy-mode,有相關的解釋
spark.eventLog.dir ->hdfs://ns1/historyserverforspark
spark.master ->spark://luyl152:7077,luyl153:7077,luyl154:7077
Classpath elements:
file:/tool/maven_scala-1.0-SNAPSHOT.jar
*/
// scalastyle:on println
}
/** 這裡的action就是spark-submit執行的動作,包括:SUBMIT, KILL, REQUEST_STATUS(使
* 用了SparkSubmitAction進行了封裝),如果沒有指定,SparkSubmitArguments設定的值是SparkSubmitAction.SUBMIT,
* 所以下面的這個模式匹配將執行submit(appArgs)
*/appArgs.actionmatch {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
二、進入submit(appArgs)分析一下:
/**
* Submit the application using theprovided parameters.
*
* This runs in two steps. First, weprepare the launch environment by setting up
* the appropriate classpath, systemproperties, and application arguments for
* running the child main class based onthe cluster manager and the deploy mode.
* Second, we use this launch environmentto invoke the main method of the child
* main class.
*
* submit方法的主要功能就是使用傳進來的引數來提交應用程式。
* 主要分為兩步驟:
* 1. 準備啟動所需的環境,包括設定classpath、系統引數和應用程式的引數(根據部署模式和cluster
* manager執行childmain類)。
* 2. 使用上一步準備好的環境呼叫child main class中的main函式,我們這裡只考慮client模式,
* cluster模式我們以後會單獨分析。
* 所以如果是spark-shell,child main class就是org.apache.spark.repl.Main,如果是
* spark-submit直接進行提交,child main class就是使用者編寫的應用程式(含有main方法的類)
*/
private def submit(args: SparkSubmitArguments): Unit = {
//childArgs:表示執行主類的main引數是個ArrayBuffer,childClasspath:是主類的jar:/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar
//sysProps:sparkConf相關的屬性值是個Map, childMainClass就是主類的類全路徑:dt.scala.Appval (childArgs, childClasspath, sysProps, childMainClass)= prepareSubmitEnvironment(args)
1、先分析一下如何prepareSubmitEnvironment(args)方法:
===》這個方法老長了,只分析一下,當前場景對應的程式碼
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs= new ArrayBuffer[String]()
val childClasspath= new ArrayBuffer[String]()
val sysProps= new HashMap[String, String]()
var childMainClass= ""
// Set the cluster manager,master就是spark://luyl152:7077,luyl153:7077,luyl154:7077
val clusterManager: Int = args.mastermatch {
case m if m.startsWith("yarn") => YARN
case m if m.startsWith("spark")=> STANDALONE
case m if m.startsWith("mesos")=> MESOS
case m if m.startsWith("local")=> LOCAL
case _=> printErrorAndExit("Mastermust start with yarn, spark, mesos, or local"); -1
}
// Set the deploy mode; default is client mode:預設是args.deployMode是null,deployMode則是CLIENT的值var deployMode:Int = args.deployModematch {
case "client"| null =>CLIENT
case "cluster"=> CLUSTER
case _=> printErrorAndExit("Deploymode must be either client or cluster"); -1
}
…yarn、Mesos、R等相關的程式碼,後面研究到了,再看
// Update args.deployMode if it is null. It will bepassed down as a Spark property later.
//由於deployMode是null,deployMode則是CLIENT的值,由會給args.deployMode的值設定client(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _=>
}
。。。。
// Special flag to avoid deprecation warnings at theclient
sysProps("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to systemproperties or command-line options in
// each deploy mode; we iterate throughthese below
val options= List[OptionAssigner](
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),
// Yarn client only
。。。。// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE| MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)
// In client mode, launch the application main classdirectly
// In addition, add the mainapplication jar and any added jars (if any) to the classpath
if (deployMode== CLIENT) {
childMainClass = args.mainClass
//就是自的main類路徑
//primaryResource就是指定的自已傳進去jar:/tool/jarDir/maven_scala-1.0-SNAPSHOT.jarif (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
//是否加第三方--jars
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
if (args.childArgs != null) { childArgs ++= args.childArgs }
}
。。。
//Add the application jar automatically so the user doesn't have to callsc.addJar
// For YARN cluster mode, the jar isalready distributed on each node as "app.jar"
// For python and R files, the primaryresource is already distributed as a regular file
//當不是yarn,也不python也不是r的時候會進入這個語句
if (!isYarnCluster&& !args.isPython && !args.isR) {
var jars= sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
//將 第三方--jars放在spark.jars中,按逗號分開
sysProps.put("spark.jars", jars.mkString(","))
}
。。。
// Load any properties specified through --conf and thedefault properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
。。。。
// Resolve paths in certain spark properties
val pathConfigs= Seq(
"spark.jars",
"spark.files",
"spark.yarn.jar",
"spark.yarn.dist.files",
"spark.yarn.dist.archives")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sysProps.get(config).foreach { oldValue =>
sysProps(config) = Utils.resolveURIs(oldValue)
}
}
。。。。。
(childArgs, childClasspath, sysProps, childMainClass)
}
===》執行完成之後,(會將--class對應的引數,--class對應的jar包,sysProps屬性,--class的類全路徑)放到元組中返回。
三、接著回到submit(appArgs)方法中
//這個doRunMain()會被它下面程式碼進行呼叫def doRunMain(): Unit = {
if (args.proxyUser != null) {
// spark-submit可以指定liunx的使用者, 在提交時指定--proxy-user的值,否則arg.proxyUser是為空
val proxyUser= UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit](){
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses theexception's stack trace, which
// makes the message printed tothe output by the JVM not very helpful. Instead,
// detect exceptions with emptystack traces here, and treat them differently.
if (e.getStackTrace().length== 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalonecluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavioras of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if themaster endpoint turns out to be not a REST server.
//在獨立群集模式下,有兩個提交閘道器:
// (1)使用o.a.s.deploy.Client作為包裝的傳統Akka閘道器
// (2)Spark 1.3中引入的基於REST的新閘道器,後者是Spark 1.3的預設行為,但如果主端點不是REST伺服器,則Spark提交將故障轉移到使用傳統閘道器。
/** http://spark.apache.org/docs/latest/submitting-applications.html
*http://blog.csdn.net/Trigl/article/details/72732241
* 1,standalone 執行apark-submit時,不設定--deploy-mode,它的值就是client。該模式的場景:client需要和master在同一個網段上
* 因為Drive要和Executorr通訊,例如Drive需要將Jar包通過NettyHTTP分發到Executor,Driver要給Executor分配任務等
* a,執行spark-submit指令碼稱為master,執行自已寫的main方法是driver,在client中driver和master是在一個節點。
* b,Driver也是一個executer,稱為第三方client,Driver程序不在Worker節點上,所以其是獨立的,不會消耗Worker叢集的資源
* c,client模式中沒有監督重啟機制(即設定了–supervise沒有用),Driver程序如果掛了,需要額外的程式重啟。
*
* 2,如果在提交spark-submit指令碼時,設定--deploy-mode cluster:使用的場景是worker和master不在一個網段中 表示driver會在worker節點中,會佔用worker的資源
* a,cluster模式下,可以設定–supervise對Driver進行監控,如果Driver掛了可以自動重啟
* b,cluster模式下,worker和master一般不在一個網段,所以各個jar提前放到worker節點中
* c,Driver類即自己的main類,分配在哪個點中是由master來分配的
*/
// master.startsWith("spark://")&& deployMode == "cluster" deployMode 預設是 client。useRest預設是trueif (args.isStandaloneCluster&& args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Sparkusing the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e:SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gatewayinstead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else{
doRunMain()
}
}
四,進入runMain(childArgs, childClasspath, sysProps, childMainClass,args.verbose)程式碼實現:
* childArgs:表示執行主類的main的引數是個ArrayBuffer,childClasspath:是主類的jar:/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar
sysProps:sparkConf相關的屬性值是個Map(解析SparkSubmit.prepareSubmitEnvironment()得到的資料), childMainClass就是主類的類全路徑:dt.scala.App
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
//這個verbose只要在spark提交的時候加入:--verbose引數就可以變成true,可以看原始碼實現很easyif (verbose){
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"Systemproperties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpathelements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
//繼承URLClassLoader將jar包加到jvm中。會使用MutableURLClassLoader來載入jar包
val loader =