Spark提交應用程序之Spark-Submit分析
1.提交應用程序
在提交應用程序的時候,用到 spark-submit 腳本。我們來看下這個腳本:
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
可以看出來 spark-class 腳本才是真正的提交程序的,spark-submit 腳本只是在其上封裝一層,並傳參 org.apache.spark.deploy.SparkSubmit 給它。這樣做的目的是分層管理和方便維護的作用。符合計算機中“遇到問題,往上加多一層解決問題的思想(通過加層解決不了問題,唯一的原因就是層次太多,無法再加了)”
下面來看下真正的提交程序腳本 spark-class :
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh #導入Spark運行的環境變量,獲取和驗證Spark Scala的版本,
# Find the java binary #獲取java的路徑,用於後面的用戶程序運行。因為Saprk雖然是用Scala寫的,但是其實跑在JVM上的。
if [ -n "${JAVA_HOME} " ]; then
RUNNER="${JAVA_HOME}/bin/java" #如果 JAVA_HOME 環境變量已設置,直接獲取java腳本路徑
else
if [ `command -v java` ]; then
RUNNER="java" #如果 JAVA_HOME 環境變量沒設置,通過 command -v java 命令獲得
else
echo "JAVA_HOME is not set" >&2 #如果沒有找到java命令,那麽直接退出,把信息重定向到標準錯誤輸出
exit 1
fi
fi
# Find assembly jar #獲取Spark運行相關jar包路徑
SPARK_ASSEMBLY_JAR =
if [ -f "${SPARK_HOME}/RELEASE" ]; then
ASSEMBLY_DIR="${SPARK_HOME}/lib" #如果RELEASE文件存在,表明Spark已結合hadoop編譯生成的jar包在lib。建議使用cdh5的生態圈,不用自己編譯,解決版本沖突的問題
else
ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION" #註意,在Spark2.0版本後,這個目錄已經沒有了,把一個大的jar包分為若幹個jar包
fi
#ls -l lib目錄,並通過正則匹配 spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar(我這裏以spark-1.6.0-cdh5.7.0為例子,並統計文件行數)
GREP_OPTIONS= num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
#如果上面的num_jars統計文件行數為0,或上面的$SPARK_ASSEMBLY_JAR長度為0,或$SPARK_PREPEND_CLASSES不存在,打印log並退出
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; thenecho "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
if [ -d "$ASSEMBLY_DIR" ]; then #如果上面的$ASSEMBLY_DIR找到
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then #如果有多個num_jars,則打印log並退出,防止運行時因為Spark版本出錯
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
fi
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}" #獲取spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar(我這裏以spark-1.6.0-cdh5.7.0為例子)
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR" #簡單賦值,更換變量是為了方便維護
# Add the launcher build dir to the classpath if requested. #若有需要,把啟動程序構建目錄添加到classpath
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR" #導出$SPARK_ASSEMBLY_JAR,相當於作為全局變量,供其他程序訪問
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
#獲取命令行傳進來的全部參數,賦給啟動程序
CMD=()
while IFS= read -d ‘‘ -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"
通過對Spark-class的解讀,可以知道應用程序提交要做的事情,總的來說,就是導入Spark運行的環境變量,獲取java命令路徑,相關與Spark運行相關的jar包,獲取命令行的參數等。最終通過 org.apache.spark.launcher.Main Main.class這個類來啟動應用程序。
2.添加應用程序的依賴項
--jar 添加自定義的jar包
--py-files 添加.py .zip .egg等文件。如果是多個.py文件,需要壓縮成.zip文件
3.spark-submit啟動應用程序
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]
——class:應用程序的入口點(如org.apache.spark.examples.SparkPi)
——master:主集群的URL(如spark://192.168.1.100:7077)
——deploy-mode:是否將驅動程序部署到工作節點(集群)或本地作為外部客戶端(客戶端)(默認:客戶端)
——conf:鍵=值格式的任意Spark配置屬性。對於包含空格的值,用引號括起“key = value”
application-jar:綁定jar的路徑,包括應用程序和所有依賴項。URL必須在集群內部全局可見,例如,一個hdfs://路徑或文件://在所有節點上存在的路徑
[application-arguments]:傳遞給主類主方法的參數
【對於Python應用程序,比如test.py 只需在<application-JAR>不提交jar,而是通過 --py-files 添加Python文件, 比如.zip, .egg或.py文件到此參數】
3.Master URLs
local 本地單線程方式運行
local[K] 本地多線程方式運行
local[K,F] 本地多線程方式運行,並指定最大失敗次數
local[*] 本地有多少core就有多少線程運行
local[*,F] 本地有多少core就有多少線程運行,並指定最大失敗次數
spark://HOST:PORT standalone模式運行,PORT默認值為7077
spark://HOST1:PORT1,HOST2:PORT2 standalone高可用模式運行,指定多個master的host和port,port默認值為7077
mesos://HOST:PORT mesos集群方式運行,分粗粒度模式和細粒度模式
yarn yarn模式運行。
4.從文件加載配置(spark-defaults.conf)
除了在Spark-Submit提交程序時通過-conf "key=value"方式傳入Spark參數之外,在這裏可以設定大量的參數來調優化,主要涉及到環境參數,應用運行參數,內存管理,網絡IO,任務調度,動態分配,安全,身份驗證等方面。由於篇幅問題,把官網翻譯的全部參數弄成表格放到github上。從此設置參數,優化Spark,媽媽再也不用擔心我了。由於水平問題,難免翻譯不準確,歡迎大家補充。
5.依賴管理
在使用spark-submit時,應用程序jar和包含的任何jar包都將自動上傳到集群。之後提供的url-jar必須用逗號分隔。該列表包含在driver和excutor類路徑中。目錄擴展不能用-jar。
file:-絕對路徑和 file:/uri 由driver的HTTP文件服務器提供,每個excutor從driver HTTP服務器獲取文件。
hdfs:,http:, https:, ftp: 從URI中提取文件和jar
本地: local:/ 將作為每個工作節點上的本地文件存在。這意味著不會產生任何網絡IO,並且適用於被推到每個worker上的大文件/jar,或者通過NFS、GlusterFS共享。
Spark提交應用程序之Spark-Submit分析