Spark中經常使用工具類Utils的簡明介紹
《深入理解Spark:核心思想與源代碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源代碼分析》一書正式出版上市
《深入理解Spark:核心思想與源代碼分析》一書第一章的內容請看鏈接《第1章 環境準備》
《深入理解Spark:核心思想與源代碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》
《深入理解Spark:核心思想與源代碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(伯篇)》
《深入理解Spark:核心思想與源代碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(仲篇)》
《深入理解Spark:核心思想與源代碼分析》一書第三章第三部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(叔篇)》
《深入理解Spark:核心思想與源代碼分析》一書第三章第四部分的內容請看鏈接《深入理解Spark:核心思想與源代碼分析》——SparkContext的初始化(季篇)》
Utils是Spark中最經常使用的工具類之中的一個,假設不關心事實上現,也不會對理解Spark有太多影響。可是對於Scala或者Spark的剛開始學習的人來說。通過了解Utils工具類的實現,也是個不錯的入門途徑。
以下將逐個介紹Utils工具類提供的經常用法。
1.localHostName
功能描寫敘述:獲取本地機器名。
def localHostName(): String = { customHostname.getOrElse(localIpAddressHostname) }
2.getDefaultPropertiesFile
功能描寫敘述:獲取默認的Spark屬性文件。def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = { env.get("SPARK_CONF_DIR") .orElse(env.get("SPARK_HOME").map{ t => s"$t${File.separator}conf"}) .map { t => new File(s"$t${File.separator}spark-defaults.conf")} .filter(_.isFile) .map(_.getAbsolutePath) .orNull }
3.loadDefaultSparkProperties
功能描寫敘述:載入指定文件裏的Spark屬性。假設沒有指定文件,則載入默認Spark屬性文件的屬性。def loadDefaultSparkProperties(conf:SparkConf, filePath: String = null):String = { val path =Option(filePath).getOrElse(getDefaultPropertiesFile()) Option(path).foreach { confFile => getPropertiesFromFile(confFile).filter{ case (k,v) => k.startsWith("spark.") }.foreach { case (k, v) => conf.setIfMissing(k, v) sys.props.getOrElseUpdate(k, v) } } path }
4.getCallSite
功能描寫敘述:獲取當前SparkContext的當前調用堆棧。將棧裏最靠近棧底的屬於spark或者Scala核心的類壓入callStack的棧頂。並將此類的方法存入lastSparkMethod;將棧裏最靠近棧頂的用戶類放入callStack,將此類的行號存入firstUserLine。類名存入firstUserFile。終於返回的例子類CallSite存儲了最短棧和長度默覺得20的最長棧的例子類。在JavaWordCount例子中。獲得的數據例如以下:
- 最短棧:JavaSparkContext at JavaWordCount.java:44;
- 最長棧:org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:44)。
def getCallSite(skipClass: String => Boolean =coreExclusionFunction): CallSite = { val trace =Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement=> ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace") } var lastSparkMethod= "<unknown>" var firstUserFile= "<unknown>" var firstUserLine= 0 var insideSpark= true var callStack= new ArrayBuffer[String]() :+ "<unknown>" for (el<- trace) { if (insideSpark){ if (skipClass(el.getClassName)){ lastSparkMethod = if(el.getMethodName == "<init>") { el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) } else { el.getMethodName } callStack(0) = el.toString // Putlast Spark method on top of the stack trace. } else { firstUserLine = el.getLineNumber firstUserFile = el.getFileName callStack += el.toString insideSpark = false } } else { callStack += el.toString } } val callStackDepth= System.getProperty("spark.callstack.depth","20").toInt CallSite( shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine", longForm = callStack.take(callStackDepth).mkString("\n")) }
5.startServiceOnPort
功能描寫敘述:Scala跟其他腳本語言一樣。函數也能夠傳遞,此方法正是通過回調startService這個函數來啟動服務,並終於返回startService返回的service地址及port。
假設啟動過程有異常。還會多次重試,直到達到maxRetries表示的最大次數。
def startServiceOnPort[T]( startPort:Int, startService:Int => (T, Int), conf:SparkConf, serviceName:String = ""): (T, Int) = { require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535(inclusive), or 0 for a random free port.") val serviceString= if (serviceName.isEmpty) "" elses" '$serviceName'" val maxRetries= portMaxRetries(conf) for (offset<- 0 to maxRetries){ val tryPort= if (startPort == 0) { startPort } else { ((startPort+ offset - 1024)% (65536 - 1024))+ 1024 } try { val (service,port) = startService(tryPort) logInfo(s"Successfullystarted service$serviceString on port $port.") return (service,port) } catch { case e:Exception if isBindCollision(e) => if (offset>= maxRetries) { val exceptionMessage= s"${e.getMessage}:Service$serviceString failed after $maxRetries retries!" val exception= new BindException(exceptionMessage) exception.setStackTrace(e.getStackTrace) throw exception } logWarning(s"Service$serviceString couldnot bind on port $tryPort. " + s"Attempting port ${tryPort+ 1}.") } } throw newSparkException(s"Failed to start service$serviceString on port$startPort") }
6.createDirectory
功能描寫敘述:用spark+UUID的方式創建暫時文件文件夾,假設創建失敗會多次重試,最多重試10次。
def createDirectory(root: String, namePrefix: String = "spark"): File = { var attempts = 0 val maxAttempts = MAX_DIR_CREATION_ATTEMPTS var dir: File = null while (dir == null){ attempts += 1 if (attempts > maxAttempts) { throw newIOException("Failed to create a temp directory(under " + root + ") after "+ maxAttempts + " attempts!") } try { dir = new File(root, "spark-"+ UUID.randomUUID.toString) if (dir.exists() || !dir.mkdirs()) { dir = null } } catch { casee: SecurityException => dir = null;} } dir }
7.getOrCreateLocalRootDirs
功能描寫敘述:依據spark.local.dir的配置,作為本地文件的根文件夾,在創建一、二級文件夾之前要確保根文件夾是存在的。然後調用createDirectory創建一級文件夾。
private[spark] defgetOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { if (isRunningInYarnContainer(conf)) { getYarnLocalDirs(conf).split(",") } else { Option(conf.getenv("SPARK_LOCAL_DIRS")) .getOrElse(conf.get("spark.local.dir",System.getProperty("java.io.tmpdir"))) .split(",") .flatMap {root => try { val rootDir = newFile(root) if (rootDir.exists || rootDir.mkdirs()) { val dir = createDirectory(root) chmod700(dir) Some(dir.getAbsolutePath) } else { logError(s"Failed to create dir in $root. Ignoring this directory.") None } } catch { case e: IOException => logError(s"Failed to create local rootdir in $root. Ignoring this directory.") None } } .toArray } }
8.getLocalDir
功能描寫敘述:查詢Spark本地文件的一級文件夾。
def getLocalDir(conf: SparkConf): String = { getOrCreateLocalRootDirs(conf)(0) }
9.createTempDir
功能描寫敘述:在Spark一級文件夾下創建暫時文件夾。並將文件夾註冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中。
def createTempDir( root: String= System.getProperty("java.io.tmpdir"), namePrefix:String = "spark"): File = { val dir =createDirectory(root, namePrefix) registerShutdownDeleteDir(dir) dir }
10.RegisterShutdownDeleteDir
功能描寫敘述:將文件夾註冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中,以便在進程退出時刪除。
def registerShutdownDeleteDir(file: File) { val absolutePath =file.getAbsolutePath() shutdownDeletePaths.synchronized{ shutdownDeletePaths += absolutePath } }
11.hasRootAsShutdownDeleteDir
功能描寫敘述:推斷文件是否匹配關閉時要刪除的文件及文件夾,shutdownDeletePaths:scala.collection.mutable.HashSet[String]存儲在進程關閉時要刪除的文件及文件夾。
def hasRootAsShutdownDeleteDir(file: File): Boolean = { val absolutePath= file.getAbsolutePath() val retval= shutdownDeletePaths.synchronized { shutdownDeletePaths.exists { path => !absolutePath.equals(path) && absolutePath.startsWith(path) } } if (retval){ logInfo("path = " + file + ", already present as root for deletion.") } retval }
12.deleteRecursively
功能描寫敘述:用於刪除文件或者刪除文件夾及其子文件夾、子文件,而且從shutdownDeletePaths:scala.collection.mutable.HashSet[String]中移除此文件或文件夾。
def deleteRecursively(file: File) { if (file != null){ try { if (file.isDirectory &&!isSymlink(file)) { var savedIOException:IOException = null for (child<- listFilesSafely(file)) { try { deleteRecursively(child) } catch { case ioe:IOException => savedIOException = ioe } } if (savedIOException!= null) { throw savedIOException } shutdownDeletePaths.synchronized { shutdownDeletePaths.remove(file.getAbsolutePath) } } } finally { if (!file.delete()) { if (file.exists()) { throw newIOException("Failed to delete: " +file.getAbsolutePath) } } } } }
13.getSparkClassLoader
功能描寫敘述:獲取載入當前class的ClassLoader。
def getSparkClassLoader = getClass.getClassLoader
14.getContextOrSparkClassLoader
功能描寫敘述:用於獲取線程上下文的ClassLoader,沒有設置時獲取載入Spark的ClassLoader。
def getContextOrSparkClassLoader = Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
15.newDaemonCachedThreadPool
功能描寫敘述:使用Executors.newCachedThreadPool創建的緩存線程池。
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { val threadFactory =namedThreadFactory(prefix) Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] }
16.doFetchFile
功能描寫敘述:使用URLConnection通過http協議下載文件。
private defdoFetchFile(url: String, targetDir: File, filename: String, conf: SparkConf, securityMgr:SecurityManager, hadoopConf: Configuration) { val tempFile= File.createTempFile("fetchFileTemp",null, newFile(targetDir.getAbsolutePath)) val targetFile= new File(targetDir, filename) val uri = new URI(url) val fileOverwrite= conf.getBoolean("spark.files.overwrite",defaultValue = false) Option(uri.getScheme).getOrElse("file")match { case "http"| "https" | "ftp" => logInfo("Fetching " + url + " to " + tempFile) var uc:URLConnection = null if (securityMgr.isAuthenticationEnabled()) { logDebug("fetchFile with security enabled") val newuri= constructURIForAuthentication(uri,securityMgr) uc = newuri.toURL().openConnection() uc.setAllowUserInteraction(false) } else { logDebug("fetchFile not using security") uc = newURL(url).openConnection() } valtimeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 uc.setConnectTimeout(timeout) uc.setReadTimeout(timeout) uc.connect() val in = uc.getInputStream() downloadFile(url, in, tempFile, targetFile,fileOverwrite) case "file"=> val sourceFile= if (uri.isAbsolute)new File(uri)else newFile(url) copyFile(url, sourceFile, targetFile, fileOverwrite) case _ => val fs =getHadoopFileSystem(uri, hadoopConf) val in = fs.open(newPath(uri)) downloadFile(url, in, tempFile, targetFile,fileOverwrite) } }
17.fetchFile
功能描寫敘述:假設文件在本地有緩存。則從本地獲取。否則通過HTTP遠程下載。最後對.tar、.tar.gz等格式的文件解壓縮後,調用shell命令行的chmod命令給文件添加a+x的權限。
def fetchFile( url: String, targetDir:File, conf:SparkConf, securityMgr:SecurityManager, hadoopConf:Configuration, timestamp:Long, useCache:Boolean) { val fileName= url.split("/").last val targetFile= new File(targetDir, fileName) val fetchCacheEnabled= conf.getBoolean("spark.files.useFetchCache",defaultValue = true) if (useCache && fetchCacheEnabled) { val cachedFileName= s"${url.hashCode}${timestamp}_cache" val lockFileName= s"${url.hashCode}${timestamp}_lock" val localDir= new File(getLocalDir(conf)) val lockFile= new File(localDir,lockFileName) val raf = new RandomAccessFile(lockFile,"rw") val lock = raf.getChannel().lock() val cachedFile= new File(localDir,cachedFileName) try { if (!cachedFile.exists()){ doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) } } finally { lock.release() } copyFile( url, cachedFile, targetFile, conf.getBoolean("spark.files.overwrite",false) ) } else { doFetchFile(url, targetDir, fileName,conf, securityMgr, hadoopConf) } if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { logInfo("Untarring " + fileName) Utils.execute(Seq("tar", "-xzf", fileName),targetDir) } else if(fileName.endsWith(".tar")){ logInfo("Untarring " + fileName) Utils.execute(Seq("tar", "-xf", fileName),targetDir) } FileUtil.chmod(targetFile.getAbsolutePath, "a+x") }
18.executeAndGetOutput
功能描寫敘述:運行一條command命令。而且獲取它的輸出。調用stdoutThread的join方法,讓當前線程等待stdoutThread運行完畢。
def executeAndGetOutput( command:Seq[String], workingDir:File = new File("."), extraEnvironment: Map[String, String] = Map.empty): String = { val builder= new ProcessBuilder(command: _*) .directory(workingDir) val environment= builder.environment() for ((key, value) <- extraEnvironment) { environment.put(key,value) } val process= builder.start() new Thread("readstderr for " + command(0)) { override defrun() { for (line<- Source.fromInputStream(process.getErrorStream).getLines()){ System.err.println(line) } } }.start() val output= new StringBuffer val stdoutThread= new Thread("readstdout for " + command(0)) { override defrun() { for (line<- Source.fromInputStream(process.getInputStream).getLines()){ output.append(line) } } } stdoutThread.start() val exitCode= process.waitFor() stdoutThread.join() // Wait for itto finish reading output if (exitCode!= 0) { logError(s"Process $commandexited with code $exitCode: $output") throw newSparkException(s"Process $command exited with code $exitCode") } output.toString }
19.memoryStringToMb
功能描寫敘述:將內存大小字符串轉換為以MB為單位的整型值。
def memoryStringToMb(str: String): Int = { val lower =str.toLowerCase if (lower.endsWith("k")) { (lower.substring(0,lower.length-1).toLong/ 1024).toInt } else if(lower.endsWith("m")){ lower.substring(0,lower.length-1).toInt } else if(lower.endsWith("g")){ lower.substring(0,lower.length-1).toInt* 1024 } else if(lower.endsWith("t")){ lower.substring(0,lower.length-1).toInt* 1024 * 1024 } else {// nosuffix, so it's just a number in bytes (lower.toLong / 1024/ 1024).toInt } }
Spark中經常使用工具類Utils的簡明介紹