1. 程式人生 > >Spark -6:執行Spark on YARN

Spark -6:執行Spark on YARN

翻譯:http://spark.apache.org/docs/latest/running-on-yarn.html

支援在YARN上執行(Hadoop NextGen)在Spark0.6.0版本,並在以後的版本中得到改進。

在YARN上啟動Spark

確保HADOOP_CONF_DIR或YARN_CONF_DIR指向包含Hadoop叢集的(客戶端)配置檔案的目錄。

這些配置用於寫入HDFS並連線到YARN ResourceManager。此目錄中包含的配置將分發到YARN群集,以便應用程式使用的所有容器都使用相同的配置。如果配置引用了Java系統屬性或未由YARN管理的環境變數,則還應在Spark應用程式的配置(驅動程式,執行程式和在客戶端模式下執行時的AM)中設定它們。

有兩種部署模式可用於在YARN上啟動Spark應用程式。

在叢集模式下,Spark驅動程式在叢集上由YARN管理的應用程式主程序內執行,客戶端可以在啟動應用程式後離開。在客戶端模式下,驅動程式在客戶端程序中執行,應用程式主伺服器僅用於從YARN請求資源。

與Spark獨立模式和Mesos模式不同,其中主節點地址在-master引數中指定,在YARN模式下,ResourceManager的地址從Hadoop配置中選取。因此,-master引數是yarn
在叢集模式下啟動Spark應用程式:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

For example:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    lib/spark-examples*.jar \
    10

以上啟動一個YARN客戶端程式,啟動預設的應用程式主。然後,SparkPi將作為Application Master的子執行緒執行。客戶端將定期輪詢應用程式主控以獲取狀態更新,並在控制檯中顯示它們。應用程式完成執行後,客戶端將退出。請參閱下面的“除錯應用程式”部分,瞭解如何檢視驅動程式和執行程式日誌。
要在客戶端模式下啟動Spark應用程式,請執行相同操作,但用客戶端替換叢集。下面顯示瞭如何在客戶端模式下執行spark-shell:
$ ./bin/spark-shell --master yarn --deploy-mode client

新增其他JAR
在叢集模式下,驅動程式在與客戶端不同的計算機上執行,​​因此SparkContext.addJar將不會開箱即用客戶端本地的檔案。要使客戶端上的檔案可用於SparkContext.addJar,請在啟動命令中使用--jars選項來包含這些檔案。
$ ./bin/spark-submit --class my.main.Class \
    --master yarn \
    --deploy-mode cluster \
    --jars my-other-jar.jar,my-other-other-jar.jar \
    my-main-jar.jar \
    app_arg1 app_arg2
準備
在YARN上執行Spark需要使用YARN支援構建的Spark。可以從專案網站的下載頁面下載。要自己構建Spark,請參考構建Spark。
要使Spark執行時jar可以從YARN端訪問,可以指定spark.yarn.archive或spark.yarn.jars。有關詳細資訊,請參閱Spark屬性。如果既沒有指定spark.yarn.archive也沒有指定spark.yarn.jars,Spark將在$ SPARK_HOME / jars下建立一個包含所有jar的zip檔案,並將其上傳到分散式快取。
配置
對於Spark on YARN和其他部署模式,大多數配置相同。有關這些的更多資訊,請參閱配置頁。這些是特定於YARN上的Spark的配置。
除錯應用程式

在YARN術語中,執行者和應用程式masters 在“容器”中執行。 YARN有兩種模式用於在應用程式完成後處理容器日誌。如果啟用日誌聚合(使用yarn.log-aggregation-enable配置),容器日誌將複製到HDFS並在本地計算機上刪除。可以使用yarn logs命令從叢集中的任何位置檢視這些日誌。

yarn logs -applicationId <app ID>

將打印出來自給定應用程式的所有容器的所有日誌檔案的內容。您還可以使用HDFS shell或API直接在HDFS中檢視容器日誌檔案。可以通過檢視您的YARN配置(yarn.nodemanager.remote-app-log-dir和yarn.nodemanager.remote-app-log-dir-suffix)找到它們所在的目錄。日誌還可以在Spark Web UI的“執行程式”選項卡下找到。您需要同時執行Spark歷史記錄伺服器和MapReduce歷史記錄伺服器,並在yarn-site.xml中正確配置yarn.log.server.url。 Spark歷史記錄伺服器UI上的日誌URL將重定向您到MapReduce歷史記錄伺服器以顯示聚合日誌。

當未開啟日誌聚合時,日誌將在每臺計算機上的本地保留在YARN_APP_LOGS_DIR下,通常配置為/ tmp / logs或$ HADOOP_HOME / logs / userlogs,具體取決於Hadoop版本和安裝。檢視容器的日誌需要轉到包含它們並在此目錄中查詢的主機。子目錄根據應用程式ID和容器ID組織日誌檔案。日誌還可以在Spark Web UI的“執行程式”選項卡下找到,並且不需要執行MapReduce歷史記錄伺服器。

要檢視每個容器的啟動環境,請將yarn.nodemanager.delete.debug-delay-sec增加到一個較大的值(例如36000),然後通過yarn.nodemanager.local-dirs訪問應用程式快取,容器所在的節點推出。此目錄包含啟動指令碼,JAR和用於啟動每個容器的所有環境變數。這個過程對於除錯類路徑問題特別有用。 (請注意,啟用此功能需要群集設定和所有節點管理器重新啟動的管理員許可權。因此,這不適用於託管叢集)。

要為應用程式主控或執行程式使用自定義log4j配置,請選擇以下選項:

1.通過將spark-submit新增到要與應用程式一起上傳的檔案的--files列表中,上傳一個自定義的log4j.properties。

2.新增-Dlog4j.configuration = <配置檔案的位置>到spark.driver.extraJavaOptions(對於驅動程式)或spark.executor.extraJavaOptions(對於執行者)。請注意,如果使用檔案,則應顯式提供檔案:協議,並且該檔案需要在所有節點上本地存在。

3.更新$ SPARK_CONF_DIR / log4j.properties檔案,它將與其他配置一起自動上傳。請注意,如果指定了多個選項,其他2個選項的優先順序高於此選項。

注意,對於第一個選項,執行器和應用程式主機將共享相同的log4j配置,當它們在同一個節點上執行時(例如,嘗試寫入同一個日誌檔案),可能會導致問題。

如果需要引用正確的位置將日誌檔案放在YARN中,以便YARN可以正確顯示和聚合它們,請在log4j.properties中使用spark.yarn.app.container.log.dir。例如,

log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.

對於流應用程式,配置RollingFileAppender並將檔案位置設定為YARN的日誌目錄將避免由大型日誌檔案導致的磁碟溢位,並且可以使用YARN的日誌實用程式訪問日誌。

要為應用程式主節點和執行程式使用自定義metrics.properties,請更新$ SPARK_CONF_DIR / metrics.properties檔案。它將自動與其他配置一起上傳,因此您不需要使用--files手動指定它。

Spark Properties

Property Name Default Meaning
spark.yarn.am.memory 512m Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m2g). In cluster mode, use spark.driver.memory instead.

Use lower-case suffixes, e.g. kmgt, and p, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.

spark.yarn.am.cores 1 Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead.
spark.yarn.am.waitTime 100s In cluster mode, time for the YARN Application Master to wait for the SparkContext to be initialized. In client mode, time for the YARN Application Master to wait for the driver to connect to it.
spark.yarn.submit.file.replication The default HDFS replication (usually 3) HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.stagingDir Current user's home directory in the filesystem Staging directory used while submitting applications.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.
spark.yarn.scheduler.heartbeat.interval-ms 3000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN's configuration for the expiry interval, i.e. yarn.am.liveness-monitor.expiry-interval-ms.
spark.yarn.scheduler.initial-allocation.interval 200ms The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger thanspark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached.
spark.yarn.max.executor.failures numExecutors * 2, with minimum of 3 The maximum number of executor failures before failing the application.
spark.yarn.historyServer.address (none) The address of the Spark history server, e.g. host.com:18080. The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For example, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to ${hadoopconf-yarn.resourcemanager.hostname}:18080.
spark.yarn.dist.archives (none) Comma separated list of archives to be extracted into the working directory of each executor.
spark.yarn.dist.files (none) Comma-separated list of files to be placed in the working directory of each executor.
spark.yarn.dist.jars (none) Comma-separated list of jars to be placed in the working directory of each executor.
spark.executor.instances 2 The number of executors for static allocation. With spark.dynamicAllocation.enabled, the initial set of executors will be at least this large.
spark.yarn.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
spark.yarn.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memoryOverhead AM memory * 0.10, with minimum of 384 Same as spark.yarn.driver.memoryOverhead, but for the YARN Application Master in client mode.
spark.yarn.am.port (random) Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
spark.yarn.queue default The name of the YARN queue to which the application is submitted.
spark.yarn.jars (none) List of libraries containing Spark code to distribute to YARN containers. By default, Spark on YARN will use Spark jars installed locally, but the Spark jars can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to jars on HDFS, for example, set this configuration to hdfs:///some/path. Globs are allowed.
spark.yarn.archive (none) An archive containing needed Spark jars for distribution to the YARN cache. If set, this configuration replaces spark.yarn.jars and the archive is used in all the application's containers. The archive should contain jar files in its root directory. Like with the previous option, the archive can also be hosted on HDFS to speed up file distribution.
spark.yarn.access.namenodes (none) A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032, webhdfs://nn3.com:50070. The Spark application must have access to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN. The user can specify multiple of these and to set multiple environment variables. In clustermode this controls the environment of the Spark driver and in client mode it only controls the environment of the executor launcher.
spark.yarn.containerLauncherMaxThreads 25 The maximum number of threads to use in the YARN Application Master for launching executor containers.
spark.yarn.am.extraJavaOptions (none) A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead. Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap size settings can be set with spark.yarn.am.memory
spark.yarn.am.extraLibraryPath (none) Set a special library path to use when launching the YARN Application Master in client mode.
spark.yarn.maxAppAttempts yarn.resourcemanager.am.max-attempts in YARN The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration.
spark.yarn.am.attemptFailuresValidityInterval (none) Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
spark.yarn.executor.failuresValidityInterval (none) Defines the validity interval for executor failure tracking. Executor failures which are older than the validity interval will be ignored.
spark.yarn.submit.waitAppCompletion true In YARN cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive reporting the application's status. Otherwise, the client process will exit after submission.
spark.yarn.am.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes AM will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.executor.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes executors will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.tags (none) Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
spark.yarn.keytab (none) The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master)
spark.yarn.principal (none) Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master)
spark.yarn.config.gatewayPath (none) A path that is valid on the gateway host (the host where a Spark application is started) but may differ for paths for the same resource in other nodes in the cluster. Coupled with spark.yarn.config.replacementPath, this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.

The replacement path normally will contain a reference to some environment variable exported by YARN (and, thus, visible to Spark containers).

For example, if the gateway node has Hadoop libraries installed on /disk1/hadoop, and the location of the Hadoop install is exported by YARN as the HADOOP_HOME environment variable, setting this value to /disk1/hadoop and the replacement path to $HADOOP_HOME will make sure that paths used to launch remote processes properly reference the local YARN configuration.

spark.yarn.config.replacementPath (none) See spark.yarn.config.gatewayPath.
spark.yarn.security.credentials.${service}.enabled true Controls whether to obtain credentials for services when security is enabled. By default, credentials for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run. For further details please see [Running in a Secure Cluster](running-on-yarn.html#running-in-a-secure-cluster)
spark.yarn.rolledLog.includePattern (none) Java Regex to filter the log files which match the defined include pattern and those log files will be aggregated in a rolling fashion. This will be used with YARN's rolling log aggregation, to enable this feature in YARN sideyarn.nodemanager.log-aggregation.roll-monitoring-interval-secondsshould be configured in yarn-site.xml. This feature can only be used with Hadoop 2.6.1+. The Spark log4j appender needs be changed to use FileAppender or another appender that can handle the files being removed while its running. Based on the file name configured in the log4j configuration (like spark.log), the user should set the regex (spark*) to include all the log files that need to be aggregated.
spark.yarn.rolledLog.excludePattern (none) Java Regex to filter the log files which match the defined exclude pattern and those log files will not be aggregated in a rolling fashion. If the log file name matches both the include and the exclude pattern, this file will be excluded eventually.

要點:

1.核心請求是否在排程決策中得到執行取決於使用的排程程式及其配置方式。

2.在叢集模式下,Spark執行器和Spark驅動程式使用的本地目錄將是為YARN(Hadoop YARN配置yarn.nodemanager.local-dirs)配置的本地目錄。如果使用者指定spark.local.dir,它將被忽略。在客戶端模式下,Spark執行程式將使用為YARN配置的本地目錄,而Spark驅動程式將使用spark.local.dir中定義的目錄。這是因為Spark驅動程式不在客戶端模式下在YARN群集上執行,只有Spark執行程式。

3.--files和--archives選項支援使用指定檔名

4.--jars選項允許SparkContext.addJar函式在您使用本地檔案並在叢集模式下執行時工作。如果您使用HDFS,HTTP,HTTPS或FTP檔案,則不需要使用它。

在安全叢集中執行

如安全性所述,Kerberos在安全的Hadoop叢集中用於驗證與服務和客戶端相關聯的主體。這允許客戶端請求這些已驗證的服務;向授權的主體授予權利的服務。

Hadoop服務發出hadoop令牌以授予對服務和資料的訪問許可權。客戶端必須首先獲取它們將要訪問的服務的令牌,並將它們與它們的應用程式一起傳遞,因為它在YARN叢集中啟動。

對於Spark應用程式與HDFS,HBase和Hive進行互動,它必須使用啟動應用程式的使用者的Kerberos憑據獲取相關令牌,即身份將成為已啟動的Spark應用程式的主體。

這通常在啟動時完成:在安全叢集中,Spark將自動獲取叢集的HDFS檔案系統的令牌,並可能為HBase和Hive獲取。

如果HBase在類路徑中,HBase配置宣告應用程式是安全的(即hbase-site.xml將hbase.security.authentication設定為kerberos),並且spark.yarn.security.credentials.hbase.enabled將獲得HBase令牌未設定為false。

類似地,如果Hive在類路徑上,其配置包括“hive.metastore.uris中的元資料儲存的URI,並且spark.yarn.security.credentials.hive.enabled未設定為false,則將獲得Hive令牌。

如果應用程式需要與其他安全HDFS叢集互動,則在啟動時必須顯式請求訪問這些叢集所需的令牌。這是通過將它們列在spark.yarn.access.namenodes屬性中來實現的。

spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/

Spark通過Java服務機制支援與其他安全感知服務整合(參見java.util.ServiceLoader)。為此,Spark應該可以通過在jar的META-INF / services目錄中的相應檔案中列出org.apache.spark.deploy.yarn.security.ServiceCredentialProvider的名稱來實現org.apache.spark.deploy.yarn.security.ServiceCredentialProvider。可以通過將spark.yarn.security.tokens。{service} .enabled設定為false來禁用這些外掛,其中{service}是憑據提供程式的名稱。

配置外部Shuffle服務

要在YARN叢集中的每個NodeManager上啟動Spark Shuffle服務,請按照以下說明操作:

  1. 使用YARN配置檔案構建Spark。如果使用預打包分發,請跳過此步驟。
  2. 找到spark- <version> -yarn-shuffle.jar。如果你自己構建Spark,並且如果你正在使用分佈,這應該在$ SPARK_HOME / common / network-yarn / target / scala- <version>下。
  3. 將此jar新增到叢集中所有NodeManager的類路徑
  4. 在每個節點上的yarn-site.xml中,將spark_shuffle新增到yarn.nodemanager.aux-services,然後將yarn.nodemanager.aux-services.spark_shuffle.class設定為org.apache.spark.network.yarn.YarnShuffleService。
  5. 通過在etc / hadoop / yarn-env.sh中設定YARN_HEAPSIZE(預設為1000)來增加NodeManager的堆大小,以避免shuffle過程中的垃圾回收問題。
  6. 重新啟動叢集中的所有NodeManager。

當在YARN上執行shuffle服務時,以下額外配置選項可用

Property Name Default Meaning
spark.yarn.shuffle.stopOnFailure false 當Spark Shuffle Service的初始化失敗時是否停止NodeManager。
這可以防止由於SparkShuffle服務未執行的NodeManager上執行容器而導致的應用程式故障。

使用Apache Oozie啟動應用程式

Apache Oozie可以作為工作流的一部分啟動Spark應用程式。在安全叢集中,啟動的應用程式將需要相關的令牌來訪問叢集的服務。如果Spark使用keytab啟動,這是自動的。但是,如果Spark將在沒有keytab的情況下啟動,則設定安全性的責任必須移交給Oozie。有關配置Oozie以獲取安全叢集和獲取作業憑據的詳細資訊,請參閱Oozie網站上特定版本文件的“身份驗證”部分。

For Spark applications, the Oozie workflow must be set up for Oozie to request all tokens which the application needs, including:

  • The YARN resource manager.
  • The local HDFS filesystem.
  • Any remote HDFS filesystems used as a source or destination of I/O.
  • Hive —if used.
  • HBase —if used.
  • The YARN timeline server, if the application interacts with this.

To avoid Spark attempting —and then failing— to obtain Hive, HBase and remote HDFS tokens, the Spark configuration must be set to disable token collection for the services.

The Spark configuration must include the lines:

spark.yarn.security.tokens.hive.enabled   false
spark.yarn.security.tokens.hbase.enabled  false

The configuration option spark.yarn.access.namenodes must be unset.


Kerberos故障排除

除錯Hadoop / Kerberos問題可能是“困難的”。一個有用的技術是通過設定HADOOP_JAAS_DEBUG環境變數在Hadoop中啟用對Kerberos操作的額外記錄。

export HADOOP_JAAS_DEBUG=true

The JDK classes can be configured to enable extra logging of their Kerberos and SPNEGO/REST authentication via the system properties sun.security.krb5.debug and sun.security.spnego.debug=true

-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

All these options can be enabled in the Application Master:

spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true
spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true

Finally, if the log level for org.apache.spark.deploy.yarn.Client is set to DEBUG, the log will include a list of all tokens obtained, and their expiry details