pyspark原始碼之SparkSubmit學習( SparkSubmit.scala)
本系列文章是下載的是spark2.2.1版本的原始碼進行相關分析和學習。
SparkSubmit.scala包含了3個Object和1個class,分別是SparkSubmitAction、SparkSubmit、SparkSubmitUtil和OptionAssigner。
(1)首先來看一下SparkSubmitAction
SparkSubmitAction是一個只允許在deploy包中訪問的列舉子類,用來判斷sparksubmit命令的請求型別。
型別分為三種:提交、殺死或請求應用程式的狀態,但後兩種操作目前僅支援獨立模式和Mesos叢集模式
原始碼如下:
private[deploy] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS = Value
}
(2)其次SparkSubmitUtils也是一個Object,由它是一個sparksubmit的輔助類,主要用於提供在SparkSubmit內部使用的一些方法.
(3)最後我們說一下SparkSubmit,它是一個非常重要的Object。
// Cluster managers叢集管理器
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
// Deploy modes部署模式
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
// Special primary resource names that represent shells rather than application jars.
//表示shell而不是應用程式jars的特殊的主要資源名。
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"
我們可以看下主方法main():
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)//新建一個SparkSubmitArguments物件
if (appArgs.verbose) {//如果appArgs是冗長的則列印,且在列印時會修改敏感資訊
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)//通過spark-submit提交應用程式
case SparkSubmitAction.KILL => kill(appArgs)//通過spark-submit取消應用程式,目前僅支援獨立模式和Mesos叢集模式
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)//通過spark-submit請求得到應用程式狀態,目前僅支援獨立模式和Mesos叢集模式
}
}
通過主程式可以看出,在主程式通過匹配action來完成相關操作:包括提交應用程式,取消應用程式,或請求應用程式狀態三種,
但後兩種目前僅支援獨立模式和Mesos叢集模式
我們先來看看submit操作:
submit方法中首先通過CLI傳遞過來的引數,設定不同模式下的合適的類路徑、系統屬性及應用引數,然後建立環境執行應用程式的Main方法。
具體內容如下:
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)//準備提交環境
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// 在所有其他模式中,只需要執行主類就可以了
} else {
doRunMain()
}
}
由此可以看出submit呼叫doRunMain方法,然後doRunMain方法呼叫runMain方法觸發應用程式的main方法。
runMain方法如下:
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
//sysProps可能包含敏感資訊,所以在列印前要重新編輯
printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
val loader =//獲取類路徑
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {//將jar包新增到類路徑中
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {//設定相關屬性
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)//找到主方法
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {//如果沒找到應用程式的主方法則給出警告
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)//確保main方法為靜態方法
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
接著我們來看看kill操作:
kill操作利用CLI傳遞過來的子任務ID和master通過REST協議的Post方式取消現有的任務。僅適合獨立和Mesos的叢集模式。
private def kill(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
}
最後我們來看看requestStatus操作:
該操作利用CLI傳遞過來的子任務ID和master通過REST協議Get方式得到任務的具體資訊。僅適合獨立和Mesos的叢集模式。
private def requestStatus(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
}
(4)最後附上SparkSubmit.scala原始碼全文(裡面的漢語備註是自己理解翻譯的,而英文備註是原始碼裡提供的,可能存在著偏差,如果您有好的解讀歡迎指正與分享)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io.{File, IOException}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.nio.file.Files
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
/**
* Whether to submit, kill, or request the status of an application.是否提交、殺死或請求應用程式的狀態
* The latter two operations are currently supported only for standalone and Mesos cluster modes.
*後兩種操作目前僅支援獨立模式和Mesos叢集模式
*/
private[deploy] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS = Value
}
/**
* Main gateway of launching a Spark application.動Spark應用程式的主要關口。
*
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*該程式將使用相關的Spark依賴性設定類路徑classpath ,並提供Spark上支援的不同叢集管理器和部署模式的圖層。
*/
object SparkSubmit extends CommandLineUtils {
// Cluster managers叢集管理器
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
// Deploy modes部署模式
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
// Special primary resource names that represent shells rather than application jars.
//表示shell而不是應用程式jars的特殊的主要資源名。
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// scalastyle:off println列印並退出
private[spark] def printVersionAndExit(): Unit = {
printStream.println("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))//spark的版本資訊
printStream.println("Using Scala %s, %s, %s".format(
Properties.versionString, Properties.javaVmName, Properties.javaVersion))//Scala版本資訊,java虛擬機器名字,java版本
printStream.println("Branch %s".format(SPARK_BRANCH))
printStream.println("Compiled by user %s on %s".format(SPARK_BUILD_USER, SPARK_BUILD_DATE))
printStream.println("Revision %s".format(SPARK_REVISION))
printStream.println("Url %s".format(SPARK_REPO_URL))
printStream.println("Type --help for more information.")
exitFn(0)
}
// scalastyle:on println
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)//新建一個SparkSubmitArguments物件
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
/**
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
*使用REST協議殺死現有的提交。僅適合獨立和Mesos的叢集模式。
*/
private def kill(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
}
/**
* Request the status of an existing submission using the REST protocol.
* Standalone and Mesos cluster mode only.
*使用REST協議請求現有提交的狀態。僅適合獨立和Mesos的叢集模式。
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
}
/**
* Submit the application using the provided parameters.使用所提供的引數提交應用程式。
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*這是兩個步驟。首先,我們根據叢集管理器和部署模式所執行的子主類來設定正確的類路徑classpath、系統屬性和應用程式引數從而準備啟動環境適當的類路徑。
*第二,我們使用這個啟動環境來呼叫子主方法的主方法。
*/
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)//準備提交環境
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
/**
* Prepare the environment for submitting an application.為提交應用程式準備環境
*
* @param args the parsed SparkSubmitArguments used for environment preparation.用於準備環境解析過的SparkSubmitArguments
* @param conf the Hadoop Configuration, this argument will only be set in unit test.Hadoop配置,這個引數只會在hadoop的單元測試中設定
* @return a 4-tuple:
* (1) the arguments for the child process,關於子程序的引數
* (2) a list of classpath entries for the child,子類的類路徑classpath條目列表
* (3) a map of system properties, and系統屬性的對映
* (4) the main class for the child子程序的主類
*
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""
// Set the cluster manager設定叢集管理器
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
-1
}
// Set the deploy mode; default is client mode設定部署模式;預設是客戶端模式
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
// Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
// the master and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
//由於棄用了通過 "yarn-cluster" 和"yarn-client" 方法來同時指定叢集管理器和部署模式,
//所以,如果在他們中只有一個指定的情況下,通過其中一個,我們使用了一些邏輯來從中獲取叢集管理器型別和部署模式
if (clusterManager == YARN) {
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
args.master = "yarn"
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn"
}
// Make sure YARN is included in our build if we're trying to use it如果我們要使用yarn那麼我們需要確保我們的build中包含了它
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
}
// Update args.deployMode if it is null. It will be passed down as a Spark property later.如果是空,則更新arg.deploymode。稍後它將作為Spark屬性傳遞。
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code如果有任何並新增類路徑到jars,就解決maven依賴關係。將它們新增到py-files ,對於包含Python程式碼的包也是如此
val exclusions: Seq[String] =
if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packagesExclusions.split(",")
} else {
Nil
}
// Create the IvySettings, either load from file or build defaults建立IvySettings,要麼從檔案中載入,要麼構建預設值
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
//通過使用--jars 或者 --packages.安裝任意R包
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
// assure a keytab is available from any place in a JVM 確保在JVM中的任何位置都可以使用keytab
if (clusterManager == YARN || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}
// In client mode, download remote files.在客戶端模式下,下載遠端檔案
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
var localFiles: String = null
if (deployMode == CLIENT) {
val hadoopConf = conf.getOrElse(new HadoopConfiguration())
localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
}
// Require all python files to be local, so we can add them to the PYTHONPATH 要求所有的python檔案都是本地的,因此我們可以將它們新增到PYTHONPATH中
// In YARN cluster mode, python files are distributed as regular files, which can be non-local.在yarn叢集模式下,將python檔案作為常規檔案分發,那麼它可以是非本地檔案。
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.在Mesos叢集模式中,Mesos將自動下載非本地的python檔案。
if (args.isPython && !isYarnCluster && !isMesosCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}")
}
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
}
}
// Require all R files to be local要求所有的R檔案都是本地的
if (args.isR && !isYarnCluster && !isMesosCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local R files are supported: ${args.primaryResource}")
}
}
// The following modes are not supported or applicable以下模式不受支援或不適用
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}
// If we're running a python app, set the main class to our specific python runner如果我們執行的是一個python應用程式,將主類設定為特定的python執行器
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.如果提供了一個python檔案,將它新增到子引數和要部署的檔案列表中。
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.yarn後端以不同的方式分配主檔案,所以不要合併它。
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.yarn後端處理python檔案的方式不同,所以不要合併列表
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (localPyFiles != null) {
sysProps("spark.submit.pyFiles") = localPyFiles
}
}
// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
//在yarn模式中執行一個R應用程式,新增SparkR包資源並且R包資源需要包含所有built R相關的庫。這樣他們才可以跟job一起被分發出去
if (args.isR && clusterManager == YARN) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
// Distribute the SparkR package.分發SparkR包。
// Assigns a symbol link name "sparkr" to the shipped package.將一個名為“sparkr”的符號連結分配給裝運的包。
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
// Distribute the R package archive containing all the built R packages.分發R包
if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)//將所有的R相關資源打成zip包
if (!rPackageFile.exists()) {
printErrorAndExit("Failed to zip all the built R packages.")
}
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.將一個名為“sparkr”的符號連結分配給裝運的包。
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
}
}
// TODO: Support distributing R packages with standalone cluster支援獨立叢集分發R包
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
}
// TODO: Support distributing R packages with mesos cluster支援使用mesos叢集分發R包
if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with mesos cluster is not supported.")
}
// If we're running an R app, set the main class to our specific R runner如果我們正在執行一個R應用程式,則將主類設定為我們制定的Rrunner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
args.mainClass = "org.apache.spark.api.r.RBackend"
} else {
// If an R file is provided, add it to the child arguments and list of files to deploy.如果提供了一個R檔案,將它新增到子引數和要部署的檔案列表中。
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (isYarnCluster && args.isR) {
// In yarn-cluster mode for an R app, add primary resource to files
// that can be distributed with the job
//在 yarn-cluster模式中執行一個R應用程式,將主資源新增到檔案中,這樣他們才可以跟job一起被分發出去
args.files = mergeFileLists(args.files, args.primaryResource)
}
// Special flag to avoid deprecation warnings at the client在客戶端設定一個特殊標誌來避免棄用警告
sysProps("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
//在每個部署模式中將一個規則列表對映到系統屬性或命令列選項上;我們通過下面的操作進行迭代
val options = List[OptionAssigner](
// All cluster managers所有叢集管理器
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),//叢集模式
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),//部署模式
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),//應用名稱
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),//Ivy相關jar包,Ivy是一個跟蹤管理專案直接以來關係的工具
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),//驅動程式使用的記憶體,在client模式下,不要在應用中的SparkConf中直接設定,--driver-memory命令設定
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraClassPath"),//驅動程式需要用到的其他類路徑,在client模式下,不要在應用中的SparkConf中直接設定
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraJavaOptions"),//驅動程式需要的其他的JVM選項,在client模式下,不要在應用中的SparkConf中直接設定
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),//驅動程式需要的其他庫路徑,在client模式下,不要在應用中的SparkConf中直接設定
// Yarn only只是yarn叢集模式
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),//以逗號分隔的jar包,他們被放置在每個執行者的工作目錄中。
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),//以逗號分隔的檔案,他們被放置在每個執行者的工作目錄中。
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),//逗號分隔的檔案列表,他們被放置在每個執行者的工作目錄中。
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),//主要用於登入KDC,安全執行HDFS。
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),
// Other options其他選項
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),//每個執行器上的核心數量
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),//每個執行器上使用的記憶體大小
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files"),//以逗號分隔的檔案列表,他們被放置在每個執行者的工作目錄中。
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),//以逗號分隔的本地jar,包括在驅動程式和執行程式類路徑中的。
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.memory"),//驅動程式使用的記憶體,在client模式下,不要在應用中的SparkConf中直接設定,--driver-memory命令設定
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.cores"),//驅動程式的核心數,只在叢集模式下使用
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
// An internal option used only for spark-shell to add user jars to repl's classloader,一個內部選項,僅用於spark-shell將使用者的jar包新增到repl的類載入器中,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to以前它使用"spark.jars" or "spark.yarn.dist.jars" 來設定。
// remote jars, so adding a new option to only specify local jars for spark-shell internally.現在可能指向遠端jars,所以新增一個新的選項來指定用在spark-shell內部的本地jar包
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
)
// In client mode, launch the application main class directly 在客戶端模式下,直接啟動應用程式主類
// In addition, add the main application jar and any added jars (if any) to the classpath 此外,將主應用程式jar和任何新增的jar(如果有的話)新增到類路徑中
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
//新增主應用程式jar和任何新增的jar新增到類路徑中,如果YARN客戶端需要這些jar。
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
// added to the classpath of YARN client.
//這個假設primaryResource和使用者jar包都是當地的jar包,否則它將不會被新增到YARN的客戶端類路徑中。
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
}
if (deployMode == CLIENT) {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}
// Map all arguments to command-line options or system properties for our chosen mode
//將所有命令列引數或者系統屬性對映到我們選擇的模式中
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
}
}
// Add the application jar automatically so the user doesn't have to call sc.addJar自動新增應用程式jar包,這樣使用者就不必呼叫sc.addjar了
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"對於yarn叢集模式,jar包通過“app.jar”已經分佈在每個節點上。
// For python and R files, the primary resource is already distributed as a regular file對於python和R檔案,主資源已經作為常規檔案分發了
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).在獨立叢集模式下,使用REST客戶端提交應用程式(Spark 1.3+)
// All Spark parameters are expected to be passed to the client through system properties.所有的Spark引數都將通過系統屬性傳遞給客戶端。
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
//在遺留的獨立叢集模式中,使用客戶機作為使用者類的包裝器
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// Let YARN know it's a pyspark app, so it distributes needed libraries.讓yarn知道它是一個pyspark應用程式,因此它可以分發需要的庫
if (clusterManager == YARN) {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class在 yarn-cluster模式下,使用yarn.Client作為使用者類的包裝器
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
if (args.isPython) {
// Second argument is main class第二個引數是主類
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
} else if (args.isR) {
// Second argument is main class第二個引數是主類
childArgs += (args.primaryResource, "")
} else {
childArgs += (args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// Load any properties specified through --conf and the default properties file通過-conf和預設屬性檔案載入指定的任何屬性
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
// Ignore invalid spark.driver.host in cluster modes.忽在叢集模式中忽略無效spark.driver.host在
if (deployMode == CLUSTER) {
sysProps -= "spark.driver.host"
}
// Resolve paths in certain spark properties在某些spark屬性中解析路徑
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist如果存在已解析的uri,則用剛解析出的替換舊的uri
sysProps.get(config).foreach { oldValue =>
sysProps(config) = Utils.resolveURIs(oldValue)
}
}
// Resolve and format python file paths properly before adding them to the PYTHONPATH.在將它們新增到PYTHONPATH之前,正確地解析和格式化python檔案路徑。
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
//在--py-files情況下,解析部分就是冗餘的,但是在使用者通過在他的預設配置檔案中設定了`spark.submit.pyFiles`時則很有必要了
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
//在yarn和mesos叢集模式下,忽略格式化python路徑。這兩種模式支援處理遠端python檔案,他們可以分發且將python檔案新增到本地。
resolvedPyFiles
}
sysProps("spark.submit.pyFiles") = formattedPyFiles
}
(childArgs, childClasspath, sysProps, childMainClass)
}
/**
* Run the main method of the child class using the provided launch environment.使用所提供的啟動環境執行子類的主方法。
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*注意:這個主類將不會被使用者提供的如果我們正在執行叢集部署模式或python應用程式。
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
//sysProps可能包含敏感資訊,所以在列印前要重新編輯
printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
val loader =//獲取類路徑
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {//將jar包新增到類路徑中
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {//設定相關屬性
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)//找到主方法
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {//如果沒找到應用程式的主方法則給出警告
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)//確保main方法為靜態方法
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
val uri = Utils.resolveURI(localJar)//解析本地jar包的uri
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
printWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
printWarning(s"Skip remote jar $uri.")
}
}
/**
* Return whether the given primary resource represents a user jar.
*返回:給定的主資源是否表示一個使用者的jar包。
*/
private[deploy] def isUserJar(res: String): Boolean = {
!isShell(res) && !isPython(res) && !isInternal(res) && !isR(res)
}
/**
* Return whether the given primary resource represents a shell.
*返回:給定的主資源是否表示一個shell。
*/
private[deploy] def isShell(res: String): Boolean = {
(res == SPARK_SHELL || res == PYSPARK_SHELL || res == SPARKR_SHELL)
}
/**
* Return whether the given main class represents a sql shell.
*返回:給定的主類是否表示一個sql shell。
*/
private[deploy] def isSqlShell(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
}
/**
* Return whether the given main class represents a thrift server.
*返回:給定的主類是否表示thrift伺服器。
*/
private def isThriftServer(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
}
/**
* Return whether the given primary resource requires running python.
*返回:給定的主資源是否需要執行python。
*/
private[deploy] def isPython(res: String): Boolean = {
res != null && res.endsWith(".py") || res == PYSPARK_SHELL
}
/**
* Return whether the given primary resource requires running R.
*返回:給定的主資源是否需要執行R
*/
private[deploy] def isR(res: String): Boolean = {
res != null && res.endsWith(".R") || res == SPARKR_SHELL
}
private[deploy] def isInternal(res: String): Boolean = {
res == SparkLauncher.NO_RESOURCE
}
/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*在一個只用逗號分隔的字串中合併的一系列以逗號分隔的檔案列表,其中一些可能是null表示沒有檔案。
*/
private def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(_.split(","))
.mkString(",")
if (merged == "") null else merged
}
/**
* Download a list of remote files to temp local files. If the file is local, the original file
* will be returned.
*將遠端檔案的列表下載為本地臨時檔案。如果檔案是本地的,原始檔案將被返回。
* @param fileList A comma separated file list.以逗號分割的檔案列表
* @return A comma separated local files list.以逗號分割的本地檔案集列表
*/
private[deploy] def downloadFileList(
fileList: String,
hadoopConf: HadoopConfiguration): String = {
require(fileList != null, "fileList cannot be null.")
fileList.split(",").map(downloadFile(_, hadoopConf)).mkString(",")
}
/**
* Download a file from the remote to a local temporary directory. If the input path points to
* a local path, returns it with no operation.
*從遠端下載一個檔案到本地的一個臨時目錄中。如果輸入路徑指向一個本地路徑,則進行無操作返回。
*/
private[deploy] def downloadFile(path: String, hadoopConf: HadoopConfiguration): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "file" | "local" =>
path
case _ =>
val fs = FileSystem.get(uri, hadoopConf)
val tmpFile = new File(Files.createTempDirectory("tmp").toFile, uri.getPath)
// scalastyle:off println
printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.")
// scalastyle:on println
fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath))
Utils.resolveURI(tmpFile.getAbsolutePath).toString
}
}
}
/** Provides utility functions to be used inside SparkSubmit. */
//提供在SparkSubmit內部使用的方法
private[spark] object SparkSubmitUtils {
// Exposed for testing
var printStream = SparkSubmit.printStream
/**
* Represents a Maven Coordinate 表示一個Maven座標
* @param groupId the groupId of the coordinate Maven座標的groupId,一般與專案名字相同
* @param artifactId the artifactId of the coordinate Maven座標的artifactId,模組
* @param version the version of the coordinate e Maven座標的version,版本資訊
*/
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
override def toString: String = s"$groupId:$artifactId:$version"
}
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
從以逗號分隔的字串中提取maven座標。座標應提供為“groupId:artifactId:version”或“groupId/artifactId:version”的格式。
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates 返回Maven座標序列
*/
def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.replace("/", ":").split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
s"be whitespace. The groupId provided is: ${splits(0)}")
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
s"be whitespace. The artifactId provided is: ${splits(1)}")
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
}
}
/** Path of the local Maven cache. 本地Maven快取路徑*/
private[spark] def m2Path: File = {
if (Utils.isTesting) {
// test builds delete the maven cache, and this can cause flakiness
new File("dummy", ".m2" + File.separator + "repository")
} else {
new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
}
}
/**
* Extracts maven coordinates from a comma-delimited string
* @param defaultIvyUserDir The default user path for Ivy
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("spark-list")
val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
localM2.setRoot(m2Path.toURI.toString)
localM2.setUsepoms(true)
localM2.setName("local-m2-cache")
cr.add(localM2)
val localIvy = new FileSystemResolver
val localIvyRoot = new File(defaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
"ivys", "ivy.xml").mkString(File.separator)
localIvy.addIvyPattern(ivyPattern)
val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]",
"[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator)
localIvy.addArtifactPattern(artifactPattern)
localIvy.setName("local-ivy-cache")
cr.add(localIvy)
// the biblio resolver resolves POM declared dependencies
val br: IBiblioResolver = new IBiblioResolver
br.setM2compatible(true)
br.setUsepoms(true)
br.setName("central")
cr.add(br)
val sp: IBiblioResolver = new IBiblioResolver
sp.setM2compatible(true)
sp.setUsepoms(true)
sp.setRoot("http://dl.bintray.com/spark-packages/maven")
sp.setName("spark-packages")
cr.add(sp)
cr
}
/**
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
* (will append to jars in SparkSubmit).
*輸出一個以逗號分隔的下載下來的jar路徑列表,將他們新增到類路徑classpath中 (將把jars新增到SparkSubmit中)。
* @param artifacts Sequence of dependencies that were resolved and retrieved
* @param cacheDirectory directory where jars are cached
* @return a comma-delimited list of paths for the dependencies
*/
def resolveDependencyPaths(
artifacts: Array[AnyRef],
cacheDirectory: File): String = {
artifacts.map { artifactInfo =>
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
cacheDirectory.getAbsolutePath + File.separator +
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
}.mkString(",")
}
/** Adds the given maven coordinates to Ivy's module descriptor. */
def addDependenciesToIvy(
md: DefaultModuleDescriptor,
artifacts: Seq[MavenCoordinate],
ivyConfName: String): Unit = {
artifacts.foreach { mvn =>
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
// scalastyle:off println
printStream.println(s"${dd.getDependencyId} added as a dependency")
// scalastyle:on println
md.addDependency(dd)
}
}
/** Add exclusion rules for dependencies already included in the spark-assembly */
def addExclusionRules(
ivySettings: IvySettings,
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka-0-8 and
// other spark-streaming utility components. Underscore is there to differentiate between
// spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x
val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
components.foreach { comp =>
md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
ivyConfName))
}
}
/**
* Build Ivy Settings using options with default resolvers
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return An IvySettings object
*/
def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
val ivySettings: IvySettings = new IvySettings
processIvyPathArg(ivySettings, ivyPath)
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)
processRemoteRepoArg(