Spark 原始碼簡單跟蹤
本文介紹下Spark 到底是如何執行sc.TextFile(...).map(....).count() 這種程式碼的,從driver端到executor端。
另外還有pid,iter都是哪來的呢? 如果你照著原始碼點進去你會很困惑。為莫名其妙怎麼就有了這些iterator呢?
Transform 和Action的來源
一般剛接觸Spark 的同學,都會被告知這兩個概念。Transform就是RDD的轉換,從一個RDD轉化到另一個RDD(也有多個的情況)。 Action則是出發實際的執行動作。
標題中的map就是一個典型的tansform操作,看原始碼,無非就是從當前的RDD構建了一個新的MapPartitionsRDD
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
這個新的RDD 接受了this
作為引數,也就記住了他的父RDD。同時接受了一個匿名函式:
(context, pid, iter) => iter.map(cleanF))
至於這個context,pid,iter是怎麼來的,你當前是不知道的。你只是知道這個新的RDD,有這麼一個函式。至於什麼時候這個函式會被呼叫,我們下面會講解到。
而一個Action是什麼樣的呢?我們看看count:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
發現不一樣了,要真的開始run Job了。sparkContext 的runJob 有很多種形態,這裡你看到的是接受當前這個RDD 以及一個函式(Utils.getIteratorSize _)。
當然,這裡的Utils.getItteratorSize 是一個已經實現好的函式:
def getIteratorSize[T](iterator: Iterator[T]): Long = { var count = 0L while (iterator.hasNext) { count += 1L iterator.next() } count }
它符合 sc.runJob 需要接受的簽名形態:
func: Iterator[T] => U
Driver端的工作
這裡你會見到一些熟悉的身影,比如dagScheduler,TaskScheduler,SchedulerBackend等。我們慢慢分解。
我們深入runJob,你馬上就可以看到了dagScheduler了。
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
這裡的cleanedFunc 就是前面那個 func: Iterator[T] => U
函式。在我們的例子裡,就是一個計數的函式。
這樣我們就順利的離開SparkContext 進入DAGScheduler的王國了。
dagScheduler會進一步提交任務。
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
請記住上面第二個引數,func其實就是前面的 Utils.getItteratorSize 函式,不過簽名略有改變,添加了context,變成了這種形態:
(TaskContext, Iterator[_]) => _
接著會變成一個事件,發到事件佇列裡,其中 func2 還是上面的func,只是被改了名字而已。
eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
dag會通過handleJobSubmitted
函式處理這個事件。在這裡完成Stage的拆分。這個不是我們這次關注的主題,所以不詳細討論。最後,會把Stage進行提交:
submitMissingTasks(finalStage)
提交到哪去了呢?會根據Stage的型別,生成實際的任務,然後序列化。序列化後通過廣播機制傳送到所有節點上去。
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
case stage: ResultStage =>
closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
}
taskBinary = sc.broadcast(taskBinaryBytes)
然後生成tasks物件,ShuffleMapTask
或者ResultTask
,我們這裡的count是ResultTask,通過下面的方式提交:
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
現在我們進入 TaskSchedulerImpl 的地盤了。在submitTasks裡我們呼叫了backend.我們接著就進入到CoarseGrainedSchedulerBackend.DriverEndpoint
裡。這個DriverEndPoint做完應該怎麼把Task分配到哪些Executor的計算後,最後會去做真正的launchTask的工作:
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
把序列化好的任務傳送到Executor 上。到這裡,Driver端的工作就完整了。
有一點你可能會比較好奇,為什麼要做兩次序列化,傳送兩次的? 也就是前面的taskBinary,還有serializedTask。 taskBinany 包括一些RDD,函式等資訊。而serializedTask 這是整個Task的任務資訊,比如對應的那個分割槽號等。後面我們還會看到taskBinary的身影。
Executor端
Executor 的入口是org.apache.spark.executor. Executor
類。你可以看到夢寐以求的launchTask 方法
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
核心你看到了,是TaskRunner
方法。進去看看,核心程式碼如下:
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
這個task(ResultTask).run裡是我們最後的核心,真正的邏輯呼叫發生在這裡:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
前面通過taskBinary 還原出RDD,func。 而這裡的func就是我們那個經過改良
的Utils.getItteratorSize函式,前面在driver端就被改造成func(context, rdd.iterator(partition, context))
這種形態了。但是函式體還是下面的
def getIteratorSize[T](iterator: Iterator[T]): Long = {
var count = 0L
while (iterator.hasNext) {
count += 1L
iterator.next()
}
count
}
也就是是一個計數函式。引數iterator則是通過rdd.iterator(partition, context)拿到了。
總結
到此,我們完成了整個程式碼的流轉過程。之所以很多人看到這些地會比較疑惑,是因為看到的程式碼都是在driver端的。但是最後這些任務都要被序列化傳送到Executor端。所以一般我們看到的流程不是連續的。