Spark Checkpoint寫操作程式碼分析
scala> val data = sc.parallelize(List("www", "iteblog", "com")) data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:15 scala> sc.setCheckpointDir("/www/iteblog/com") scala> data.checkpoint scala> data.count
先是初始化好相關的RDD,因為checkpoint是將RDD中的資料寫到磁碟,所以需要指定一個checkpint目錄,也就是sc.setCheckpointDir("/www/iteblog/com"),這步執行完之後會在/www/iteblog/com路徑下建立相關的資料夾,比如:/www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc;然後對data RDD進行checkpoint,整個程式碼執行完,會在/www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc生存相關的檔案:
現在來對checkpint的相關程式碼進行簡單介紹。首先就是設定checkpint的目錄,這個程式碼如下:Found 4 items -rw-r--r-- 3 iteblog iteblog 0 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00000 -rw-r--r-- 3 iteblog iteblog 5 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00001 -rw-r--r-- 3 iteblog iteblog 9 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00002 -rw-r--r-- 3 iteblog iteblog 5 2015-11-25 15:05 /www/iteblog/com/ada54d92-eeb2-4cff-89fb-89a297edd4dc/rdd-2/part-00003
def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from // its own local file system, which is incorrect because the checkpoint files // are actually on the executor machines. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Checkpoint directory must be non-local " + "if Spark is running on a cluster: " + directory) } checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } }
從上面註釋可以看出,如果是非local模式,directory要求是HDFS上的目錄。事實上,如果你是非local模式,但是指定的checkpint路徑是本地路徑,程式執行的時候會出現異常:
setCheckpointDir的過程主要是在指定的目錄下建立一個資料夾,這個資料夾會在後面用到。然後我們對RDD進行checkpoint,主要做的事情如下:
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
程式第一步就是判斷checkpointDir是否為空,如果為空直接丟擲異常,而這個checkpointDir是由上面的setCheckpointDir函式設定的。這裡我們應該設定了checkpointDir,所以直接判斷checkpointData.isEmpty是否成立,checkpointData是什麼東西呢?它的型別如下:private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
RDDCheckpointData類是和RDD一一對應的,儲存著一切和RDD checkpoint相關的所有資訊,而且具體的Checkpoint操作都是它(子類)進行的。而對RDD呼叫checkpoint函式主要就是初始化ReliableRDDCheckpointData物件,供以後進行checkpoint操作。從這段程式碼我們知道,對RDD呼叫checkpoint函式,其實就是初始化了checkpointData,並不立即執行checkpoint操作,你可以理解成這裡只是對RDD進行checkpoint標記操作。那什麼觸發真正的checkpoint操作?仔細看上面例子,執行data.count之後才會生成checkpoint檔案。是的,只有在Action觸發Job的時候才會進行checkpoint。Spark在執行完Job之後會判斷是否需要checkpoint:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
注意看最後一句程式碼rdd.doCheckpoint(),這個就是觸發RDD的checkpoint的,而doCheckpoint函式的實現如下:
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
又看到checkpointData了吧?這個就是在執行checkpint()函式定義的,所以如果你的RDD呼叫了checkpint()函式,那麼checkpointData.isDefined肯定是true的。而如果你的父RDD呼叫了checkpint()函式,最後也會執行你父RDD的checkpointData.get.checkpoint()程式碼。我們來看看checkpointData中的checkpoint()是如何實現的,程式碼如下:final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}
val newRDD = doCheckpoint()
// Update our state and truncate the RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}
為了防止多個執行緒對同一個RDD進行checkpint操作,首先是把checkpint的狀態由Initialized變成CheckpointingInProgress,所以如果另一個執行緒發現checkpint的狀態不是Initialized就直接return了。最後就是doCheckpoint實現了:
protected override def doCheckpoint(): CheckpointRDD[T] = {
// Create the output path for the checkpoint
val path = new Path(cpDir)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException(s"Failed to create checkpoint path $cpDir")
}
// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
new SerializableConfiguration(rdd.context.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _)
val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir)
if (newRDD.partitions.length != rdd.partitions.length) {
throw new SparkException(
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
s"number of partitions from original RDD $rdd(${rdd.partitions.length})")
}
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
首先是建立寫RDD的目錄,然後啟動一個Job去寫Checkpoint檔案,主要由ReliableCheckpointRDD.writeCheckpointFile來實現寫操作。
def writeCheckpointFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableConfiguration],
blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
if (fs.exists(tempOutputPath)) {
throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
}
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
Utils.tryWithSafeFinally {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
}
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo(s"Deleting tempOutputPath $tempOutputPath")
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: " +
s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
fs.delete(tempOutputPath, false)
}
}
}
寫完Checkpoint檔案之後,會返回newRDD,並最後賦值給cpRDD,並將Checkpoint的狀態變成Checkpointed。最後將這個RDD的依賴全部清除(markCheckpointed())
private[spark] def markCheckpointed(): Unit = {
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too
}
整個寫操作就完成了。
相關推薦
Spark Checkpoint寫操作程式碼分析
上次我對Spark RDD快取的相關程式碼《Spark RDD快取程式碼分析》進行了簡要的介紹,本文將對Spark RDD的checkpint相關的程式碼進行相關的介紹。先來看看怎麼使用checkpont:scala> val data = sc.paralleliz
徹底學會使用epoll(四)——ET的寫操作例項分析
首先,看程式四的例子。 l 程式四 點選(此處)摺疊或開啟 #include <unistd.h> #include <iostream> #include <sys/epoll.h> using namespace s
ReentrantReadWriteLock 可重入讀寫鎖程式碼分析與簡單例項
個人水平有限,如有錯誤,請各位看官指出。前面提到的ReentrantLock是排它鎖,這種鎖在同一時刻下只允許一個執行緒進行訪問,無論是公平模式還是非公平模式,效能都不是很高。ReentrantReadWriteLock是讀寫鎖,同一時刻允許多個執行緒讀,但是在寫執行緒訪問時
spark shuffle寫操作三部曲之UnsafeShuffleWriter
前言 在前兩篇文章 spark shuffle的寫操作之準備工作 中引出了spark shuffle的三種實現,spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 講述了BypassMergeSortShuffleWriter 用於
spark shuffle寫操作之SortShuffleWriter
提出問題 1. spark shuffle的預聚合操作是如何做的,其中底層的資料結構是什麼?在資料寫入到記憶體中有預聚合,在讀溢位檔案合併到最終的檔案時是否也有預聚合操作? 2. shuffle資料的排序是如何做的? 分割槽內的資料是否是有序的?若有序,spark 內部是按照什麼排序演算法來排序每一個分割
2018/10/03-字串指令(重複指令、操作資料緩衝區指令)、rep與movx指令-《惡意程式碼分析實戰》
重複指令是一組操作資料緩衝區的指令。資料緩衝區通常是一個位元組陣列的形式,也可以是單字或者雙字。(Intel'稱這些指令為字串指令) 最常見的資料緩衝區操作指令是movsx、cmps、stosx和scasx,其中x可以是b、w後者d,分別表示位元組、字和雙字。這些指令對任何形式的資料都有效。
改進的中科院分詞系統NLPIR程式碼(加入使用者詞典,去停用詞,檔案讀寫)+情感分析字典包+工具包+論文包
NLPIR分詞,加入使用者詞典,去停用詞,檔案讀寫等 原始碼下載地址 優化的分詞系統程式碼 原始碼下載地址 NLPIR分詞系統 優化的分詞系統程式碼 以下是核心程式碼 完整程式碼可以直接執行分詞,點我跳轉 public cl
Apache Spark 讀寫Apache ignite 程式碼實戰
package com.zhw.bigdata.ignite; import org.apache.ignite.spark.IgniteDataFrameSettings; import org.apache.spark.sql.Dataset; import org.apache.spar
原始碼分析關於排序操作的程式碼分析
2tcfh1燙痺己耐蒙頓《http://baobao.baidu.com/question/9e1d9b752f3b53a42e60006a4143ebe5?gJ》 zz1kkl壕瀉嗆床獻炮《http://baobao.baidu.com/question/c50e9ff68
在Linux命令列終端中寫python程式碼的簡單操作
Linux終端中的操作均是使用命令列來進行的。因此,對於小白來說,熟記幾個基本的命令列和使用方法能夠較快的在Linux命令列環境中將python用起來。 開啟命令列視窗 開啟命令列視窗的快捷鍵如下: Ctrl + Alt + t 關閉名命令列視窗 關閉命令列視窗的快捷鍵如下:
Nginx0.7.61程式碼分析(一)--寫在前面的話以及程序模型分析
寫在前面的話大概一年多以前,我看了一些ligty的程式碼,並且在這裡給出了一些自己的分析,這部分應該到了狀態機部分,後來由於我沒有繼續跟進ligty的程式碼,或者說,不再像最初那樣對它感興趣,所以也就沒有再跟進了.最近,我開始看一些nginx的程式碼,和當初閱讀ligty一樣,我不知道我會看到哪兒,分
java操作spark讀寫mongodb
首先要引入mongodb-spark-connector的maven依賴,具體的可見這個api網址:https://docs.mongodb.com/spark-connector/current/java-api/,然後基本上就可以按照api上面的內容來進行spark操作
Spark2.0機器學習系列之3:決策樹及Spark 2.0-MLlib、Scikit程式碼分析
概述 分類決策樹模型是一種描述對例項進行分類的樹形結構。 決策樹可以看為一個if-then規則集合,具有“互斥完備”性質 。決策樹基本上都是 採用的是貪心(即非回溯)的演算法,自頂向下遞迴分治構造。 生成決策樹一般包含三個步驟: 特徵選擇 決策樹生成 剪枝
C# XML操作 程式碼大全(讀XML,寫XML,更新,刪除節點,與dataset結合等)
using System; using System.Data; using System.Xml; using System.Windows.Forms; //*************************************** // 作者: ∮明天去要飯 // QICQ: 305
微信短連結秒進支付寶拆紅包的逆向分析與程式碼獲取(不用寫任何程式碼)
最近支付寶紅包風靡全國,真的是誰的群多並且發的比其他人早就能很賺一筆,目前想要拿到紅包有以下兩種途徑 複製別人的邀請碼,開啟支付寶 用支付寶掃描別人的紅包二維碼 有人感覺很麻煩於是就做了在微信裡點選短連結即可自動跳轉到支付寶領紅包的方式,理想情況下真的是
呼叫JAVA API對HDFS檔案進行檔案的讀寫、上傳下載、刪除等操作程式碼詳解
Hadoop檔案系統 基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。 Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方
【spark 讀寫資料】資料來源的讀寫操作
通用的 Load/Save 函式 在最簡單的方式下,預設的資料來源(parquet 除非另外配置通過spark.sql.sources.default)將會用於所有的操作。 Parquet 是一個列式儲存格式的檔案,被許多其他資料處理系統所支援。Spark
Hbase-0.98.6原始碼分析--Put寫操作Client端流程
客戶端程式寫資料通過HTable和Put進行操作,我們從客戶端程式碼開始分析寫資料的流程: 可以看到,客戶端寫資料最終的呼叫了HTableInterface的put()方法,因為HTableInterface只是一個介面,所以最終呼叫的是它的
spark原始碼閱讀一-spark-mongodb程式碼分析
原始碼的github地址https://github.com/mongodb/mongo-spark,是mongodb釋出的spark connection介面庫,可以方便的使用spark讀寫mongodb資料 1.rdd寫入mongodb 兩種方式將生成的rdd寫入mon
spark常見操作系列(3)--spark讀寫hbase(2)
接著上一篇, 問題(2): scan有 scan.setCaching(10000) scan.setCacheBlocks(true) 等設定.setCaching ,個人感覺不夠用.hbase 預設是在記憶體裡面放一塊資料用來讀取,所以讀取效率比較高,可是,