Spark執行在Standalone模式下產生的臨時目錄的問題
阿新 • • 發佈:2019-02-18
Spark 的Job任務在執行過程中產生大量的臨時目錄位置,導致某個分割槽磁碟寫滿,主要原因spark執行產生臨時目錄的預設路徑/tmp/spark*
專案中使用的版本情況
Hadoop: 2.7.1
Spark:1.6.0
JDK:1.8.0
1、專案運維需求
線上的Spark的叢集相關/tmp/spark-* 日誌會把 /分割槽磁碟寫滿,建議優化應用程式或更改日誌路徑到/home/ 子目錄下
2、解決方案
2.1 方案1(不建議使用)
可以通過crontab 定時執行rm -rf /tmp/spark*命令,缺點:當spark的任務執行,這個時候會生成/tmp/spark* 的臨時檔案,正好在這個時候
crontab 啟動rm命令,從而導致檔案找不到以至於spark任務執行失敗
2.2 方案2(推薦在spark-env.sh 中配置引數,不在spark-defaults.conf 中配置)
spark環境配置spark.local.dir,其中 SPARK_LOCAL_DIRS : storage directories to use on this node for shuffle and RDD data
修改 conf 目錄下的spark-defaults.conf 或者 conf 目錄下的spark-env.conf,下面我們來一一驗證哪個更好。
(1)修改spark執行時臨時目錄的配置,增加如下一行
spark.local.dir /diskb/sparktmp,/diskc/sparktmp,/diskd/sparktmp,/diske/sparktmp,/diskf/sparktmp,/diskg/sparktmp
說明:可配置多個目錄,以 "," 分隔。
(2)修改配置spark-env.sh下增加
export SPARK_LOCAL_DIRS=spark.local.dir /diskb/sparktmp,/diskc/sparktmp,/diskd/sparktmp,/diske/sparktmp,/diskf/sparktmp,/diskg/sparktmp
如果spark-env.sh與spark-defaults.conf都配置,則SPARK_LOCAL_DIRS覆蓋spark.local.dir 的配置
生產環境我們按照這樣的思路去處理
生產環境修改為:在spark-defaults.conf 下增加一行
spark.local.dir /home/hadoop/data/sparktmp
Some(localDir)
} catch {
case e: IOException =>
logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
None
}
}
}
(2) SparkConf.scala 類中的方法
這個方法告訴我們在spark-defaults.conf 中配置spark.local.dir引數在spark1.0 版本後已經過時。
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
if (contains("spark.local.dir")) {
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set b y " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
logWarning(msg)
}
val executorOptsKey = "spark.executor.extraJavaOptions"
val executorClasspathKey = "spark.executor.extr
。。。。
}
(3)Utils.scala 類中的方法
通過分析下面的程式碼,我們發現不在spark-env.sh 下配置SPARK_LOCAL_DIRS的情況下,
通過該conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")設定spark.local.dir,然後或根據路徑建立,導致上述錯誤。
故我們直接在spark-env.sh 中設定SPARK_LOCAL_DIRS 即可解決。
然後我們直接在spark-env.sh 中配置:
export SPARK_LOCAL_DIRS=/home/hadoop/data/sparktmp
/**
* Return the configured local directories where Spark can write files. This
* method does not create any directories on its own, it only encapsulates the
* logic of locating the local directories according to deployment mode.
*/
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
conf.getenv("SPARK_LOCAL_DIRS").split(",")
} else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
// Mesos already creates a directory per Mesos task. Spark should use that directory
// instead so all temporary files are automatically cleaned up when the Mesos task ends.
// Note that we don't want this if the shuffle service is enabled because we want to
// continue to serve shuffle files after the executors that wrote them have already exited.
Array(conf.getenv("MESOS_DIRECTORY"))
} else {
if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
"spark.shuffle.service.enabled is enabled.")
}
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
// permissions under each listed directory.
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
}
}
通過命令列視窗觀察日誌的生成情況,觀察Deleting directory行,發現確實改變了,終於成功了
16/09/08 14:56:19 INFO ui.SparkUI: Stopped Spark web UI at http://10.4.1.1:4040
16/09/08 14:56:19 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
16/09/08 14:56:19 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
16/09/08 14:56:19 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/09/08 14:56:19 INFO storage.MemoryStore: MemoryStore cleared
16/09/08 14:56:19 INFO storage.BlockManager: BlockManager stopped
16/09/08 14:56:19 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/09/08 14:56:19 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/09/08 14:56:19 INFO spark.SparkContext: Successfully stopped SparkContext
16/09/08 14:56:19 INFO util.ShutdownHookManager: Shutdown hook called
16/09/08 14:56:19 INFO util.ShutdownHookManager: Deleting directory /home/hadoop/data/sparktmp/spark-a72435b2-71e7-4c07-9d60-b0dd41b71ecc
16/09/08 14:56:19 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/09/08 14:56:19 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/09/08 14:56:19 INFO util.ShutdownHookManager: Deleting directory /home/hadoop/data/sparktmp/spark-a72435b2-71e7-4c07-9d60-b0dd41b71ecc/httpd-7cd8762c-85b6-4e62-8e91-be668830b0a7
然後執行通過下面的命令驗證:
bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark://10.4.1.1:7077 \
--total-executor-cores 4 \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 1 \
lib/spark-examples*.jar 10
執行完成後,有些work下executor的日誌發現會存在一些錯誤日誌,錯誤如下:
6/09/08 15:55:53 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 50212.
16/09/08 15:55:53 ERROR storage.DiskBlockManager: Failed to create local dir in . Ignoring this directory.
java.io.IOException: Failed to create a temp directory (under ) after 10 attempts!
at org.apache.spark.util.Utils$.createDirectory(Utils.scala:217)
at org.apache.spark.storage.DiskBlockManager$$anonfun$createLocalDirs$1.apply(DiskBlockManager.scala:135)
at org.apache.spark.storage.DiskBlockManager$$anonfun$createLocalDirs$1.apply(DiskBlockManager.scala:133)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.spark.storage.DiskBlockManager.createLocalDirs(DiskBlockManager.scala:133)
at org.apache.spark.storage.DiskBlockManager.<init>(DiskBlockManager.scala:45)
at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:76)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:186)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:151)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:253)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
16/09/08 15:55:53 ERROR storage.DiskBlockManager: Failed to create any local dir.
針對以上出錯的原因我們通過原始碼進行分析
(1) DiskBlockManager類中的下面的方法
通過日誌我們最終定位這塊出現的錯誤
/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
private def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")