Spark-RDD-DAG解析
阿新 • • 發佈:2020-08-08
1.原理說明
有向無環圖:如果一個有向圖無法從任意頂點出發經過若干條邊回到該點,則這個圖是一個
有向無環圖(DAG圖)
在Spark中對任務進行排隊,形成一個集合就是DAG圖,每一個頂點就是一個任務,每一條邊代表一個依賴關係
通過DAG可以對計算流程進行優化,比如將單一節點的計算操作合併,對涉及shuffle操作的步驟劃分stage等
DAG生成的重點是對Stage的劃分,劃分依據是RDD的依賴關係,對寬依賴會進行Stage的切分
DAG生成的原始碼在DAGScheduler.scala類:
/** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a * minimal schedule to run the job. It then submits stages as TaskSets to an underlying * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent * tasks that can run right away based on the data that's already on the cluster (e.g. map output * files from previous stages), though it may fail if this data becomes unavailable. * * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks * in each stage, but operations with shuffle dependencies require multiple stages (one to write a * set of map output files, and another to read those files after a barrier). In the end, every * stage will have only shuffle dependencies on other stages, and may compute multiple operations * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of * various RDDs (MappedRDD, FilteredRDD, etc). * * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred * locations to run each task on, based on the current cache status, and passes these to the * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * * When looking through this code, there are several key concepts: * * - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler. * For example, when the user calls an action, like count(), a job will be submitted through * submitJob. Each Job may require the execution of multiple stages to build intermediate data. * * - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each * task computes the same function on partitions of the same RDD. Stages are separated at shuffle * boundaries, which introduce a barrier (where we must wait for the previous stage to finish to * fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that * executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle. * Stages are often shared across multiple jobs, if these jobs reuse the same RDDs. * * - Tasks are individual units of work, each sent to one machine. * * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them * and likewise remembers which shuffle map stages have already produced output files to avoid * redoing the map side of a shuffle. * * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. * * - Cleanup: all data structures are cleared when the running jobs that depend on them finish, * to prevent memory leaks in a long-running application. * * To recover from failures, the same stage might need to run multiple times, which are called * "attempts". If the TaskScheduler reports that a task failed because a map output file from a * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost * stage(s) that compute the missing tasks. As part of this process, we might also have to create * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since * tasks from the old attempt of a stage could still be running, care must be taken to map any * events received in the correct Stage object. * * Here's a checklist to use when making or reviewing changes to this class: * * - All data structures should be cleared when the jobs involving them end to avoid indefinite * accumulation of state in long-running programs. * * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to * include the new structure. This will help to catch memory leaks. */ private[spark] class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging { /** * Get or create the list of parent stages for a given RDD. The new Stages will be created with * the provided firstJobId. * 獲取或建立一個給定RDD的父Stages列表,根據給定的firstJobId建立新的Stages */ private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } /** * Returns shuffle dependencies that are immediate parents of the given RDD. * 返回給定RDD的父節點中直接的shuffle依賴,不會返回更多 * This function will not return more distant ancestors. For example, if C has a shuffle * dependency on B which has a shuffle dependency on A: * * A <-- B <-- C * * calling this function with rdd C will only return the B <-- C dependency. * * This function is scheduler-visible for the purpose of unit testing. */ private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd) } } } parents }
2.例項解析
val conf = new SparkConf().setAppName("Demo1").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val lines = sc.textFile("/Users/jordan95225/IdeaProjects/MyLearning/Spark/src/main/resources/data.txt") //操作1 val words: RDD[String] = lines.flatMap(lines => lines.split(" ")) //操作2 val pairs: RDD[(String, Int)] = words.map(word => (word, 1)) //操作3 val wordCounts: RDD[(String, Int)] = pairs.reduceByKey(_ + _) wordCounts.collect().foreach(println) sc.stop()
程式執行之前,DAG排程器會將整個流程設為一個Stage,包含3個操作,5個RDD(讀取檔案、flatMap、map、reduceByKey local階段操作和shuffle階段操作),然後回溯整個流程,發現在shuffleRDD與MapPartitionRDD中存在Shuffle操作,切開形成兩個Stage,接著一直往前回溯發現都不存在Shuffle,歸為同一個Stage,回溯完成後,形成DAG,包含兩個Stage