1. 程式人生 > >Spark 計算過程分析

Spark 計算過程分析

Spark是一個分散式的記憶體計算框架,其特點是能處理大規模資料,計算速度快。Spark延續了Hadoop的MapReduce計算模型,相比之下Spark的計算過程保持在記憶體中,減少了硬碟讀寫,能夠將多個操作進行合併後計算,因此提升了計算速度。同時Spark也提供了更豐富的計算API。

MapReduce是Hadoop和Spark的計算模型,其特點是Map和Reduce過程高度可並行化;過程間耦合度低,單個過程的失敗後可以重新計算,而不會導致整體失敗;最重要的是資料處理中的計算邏輯可以很好的轉換為Map和Reduce操作。對於一個數據集來說,Map對每條資料做相同的轉換操作,Reduce可以按條件對資料分組,然後在分組上做操作。除了Map和Reduce操作之外,Spark還延伸出瞭如filter,flatMap,count,distinct等更豐富的操作。

RDD的是Spark中最主要的資料結構,可以直觀的認為RDD就是要處理的資料集。RDD是分散式的資料集,每個RDD都支援MapReduce類操作,經過MapReduce操作後會產生新的RDD,而不會修改原有RDD。RDD的資料集是分割槽的,因此可以把每個資料分割槽放到不同的分割槽上進行計算,而實際上大多數MapReduce操作都是在分割槽上進行計算的。Spark不會把每一個MapReduce操作都發起運算,而是儘量的把操作累計起來一起計算。Spark把操作劃分為轉換(transformation)和動作(action),對RDD進行的轉換操作會疊加起來,直到對RDD進行動作操作時才會發起計算。這種特性也使Spark可以減少中間結果的吞吐,可以快速的進行多次迭代計算。

系統結構

Spark自身只對計算負責,其計算資源的管理和排程由第三方框架來實現。常用的框架有YARN和Mesos。本文以YARN為例進行介紹。先看一下Spark on YARN的系統結構圖:

Spark on YARN系統結構圖

圖中共分為三大部分:Spark Driver, Worker, Cluster manager。其中Driver program負責將RDD轉換為任務,並進行任務排程。Worker負責任務的執行。YARN負責計算資源的維護和分配。

Driver可以執行在使用者程式中,或者執行在其中一個Worker上。Spark中的每一個應用(Application)對應著一個Driver。這個Driver可以接收RDD上的計算請求,每個動作(Action)型別的操作將被作為一個Job進行計算。Spark會根據RDD的依賴關係構建計算階段(Stage)的有向無環圖,每個階段有與分割槽數相同的任務(Task)。這些任務將在每個分割槽(Partition)上進行計算,任務劃分完成後Driver將任務提交到運行於Worker上的Executor中進行計算,並對任務的成功、失敗進行記錄和重啟等處理。

Worker一般對應一臺物理機,每個Worker上可以執行多個Executor,每個Executor都是獨立的JVM程序,Driver提交的任務就是以執行緒的形式執行在Executor中的。如果使用YARN作為資源排程框架的話,其中一個Worker上還會有Executor launcher作為YARN的ApplicationMaster,用於向YARN申請計算資源,並啟動、監測、重啟Executor。

計算過程

這裡我們從RDD到輸出結果的整個計算過程為主線,探究Spark的計算過程。這個計算過程可以分為:

  • RDD構建:構建RDD之間的依賴關係,將RDD轉換為階段的有向無環圖。
  • 任務排程:根據空閒計算資源情況進行任務提交,並對任務的執行狀態進行監測和處理。
  • 任務計算:搭建任務執行環境,執行任務並返回任務結果。
  • Shuffle過程:兩個階段之間有寬依賴時,需要進行Shuffle操作。
  • 計算結果收集:從每個任務收集並彙總結果。

在這裡我們用一個簡潔的CharCount程式為例,這個程式把含有a-z字元的列表轉化為RDD,對此RDD進行了Map和Reduce操作計算每個字母的頻數,最後將結果收集。其程式碼如下:

CharCount例子程式

RDD構建和轉換

RDD按照其作用可以分為兩種型別,一種是對資料來源的封裝,可以把資料來源轉換為RDD,這種型別的RDD包括NewHadoopRDD,ParallelCollectionRDD,JdbcRDD等。另一種是對RDD的轉換,從而實現一種計算方法,這種型別的RDD包括MappedRDD,ShuffledRDD,FilteredRDD等。資料來源型別的RDD不依賴於其他RDD,計算類的RDD擁有自己的RDD依賴。

RDD有三個要素:分割槽,依賴關係,計算邏輯。分割槽是保證RDD分散式的特性,分割槽可以對RDD的資料進行劃分,劃分後的分割槽可以分佈到不同的Executor中,大部分對RDD的計算都是在分割槽上進行的。依賴關係維護著RDD的計算過程,每個計算型別的RDD在計算時,會將所依賴的RDD作為資料來源進行計算。根據一個分割槽的輸出是否被多分割槽使用,Spark還將依賴分為窄依賴和寬依賴。RDD的計算邏輯是其功能的體現,其計算過程是以所依賴的RDD為資料來源進行的。

例子中共產生了三個RDD,除了第一個RDD之外,每個RDD與上級RDD有依賴關係。

  • spark.parallelize(data, partitionSize)方法將產生一個數據源型的ParallelCollectionRDD,這個RDD的分割槽是對列表資料的切分,沒有上級依賴,計算邏輯是直接返回分割槽資料。
  • map函式將會建立一個MappedRDD,其分割槽與上級依賴相同,會有一個依賴於ParallelCollectionRDD的窄依賴,計算邏輯是對ParallelCollectionRDD的資料做map操作。
  • reduceByKey函式將會產生一個ShuffledRDD,分割槽數量與上面的MappedRDD相同,會有一個依賴於MappedRDD的寬依賴,計算邏輯是Shuffle後在分割槽上的聚合操作。

RDD的依賴關係

Spark在遇到動作類操作時,就會發起計算Job,把RDD轉換為任務,併發送任務到Executor上執行。從RDD到任務的轉換過程是在DAGScheduler中進行的。其總體思路是根據RDD的依賴關係,把窄依賴合併到一個階段中,遇到寬依賴則劃分出新的階段,最終形成一個階段的有向無環圖,並根據圖的依賴關係先後提交階段。每個階段按照分割槽數量劃分為多個任務,最終任務被序列化並提交到Executor上執行。

RDD到Task的構建過程

當RDD的動作類操作被呼叫時,RDD將呼叫SparkContext開始提交Job,SparkContext將呼叫DAGScheduler把RDD轉化為階段的有向無環圖,然後首先將有向無環圖中沒有未完成的依賴的階段進行提交。在階段被提交時,每個階段將產生與分割槽數量相同的任務,這些任務稱之為一個TaskSet。任務的型別分為 ShuffleMapTask和ResultTask,如果階段的輸出將用於下個階段的輸入,也就是需要進行Shuffle操作,則任務型別為ShuffleMapTask。如果階段的輸入即為Job結果,則任務型別為ResultTask。任務建立完成後會交給TaskSchedulerImpl進行TaskSet級別的排程執行。

任務排程

在任務排程的分工上,DAGScheduler負責總體的任務排程,SchedulerBackend負責與Executors通訊,維護計算資源資訊,並負責將任務序列化並提交到Executor。TaskSetManager負責對一個階段的任務進行管理,其中會根據任務的資料本地性選擇優先提交的任務。TaskSchedulerImpl負責對TaskSet進行排程,通過排程策略確定TaskSet優先順序。同時是一箇中介者,其將DAGScheduler,SchedulerBackend和TaskSetManager聯結起來,對Executor和Task的相關事件進行轉發。

在任務提交流程上,DAGScheduler提交TaskSet到TaskSchedulerImpl,使TaskSet在此註冊。TaskSchedulerImpl通知SchedulerBackend有新的任務進入,SchedulerBackend呼叫makeOffers根據註冊到自己的Executors資訊,確定是否有計算資源執行任務,如有資源則通知TaskSchedulerImpl去分配這些資源。 TaskSchedulerImpl根據TaskSet排程策略優先分配TaskSet接收此資源。TaskSetManager再根據任務的資料本地性,確定提交哪些任務。最終任務的閉包被SchedulerBackend序列化,並傳輸給Executor進行執行。

Spark的任務排程

根據以上過程,Spark中的任務排程實際上分了三個層次。第一層次是基於階段的有向無環圖進行Stage的排程,第二層次是根據排程策略(FIFO,FAIR)進行TaskSet排程,第三層次是根據資料本地性(Process,Node,Rack)在TaskSet內進行排程。

任務計算

任務的計算過程是在Executor上完成的,Executor監聽來自SchedulerBackend的指令,接收到任務時會啟動TaskRunner執行緒進行任務執行。在TaskRunner中首先將任務和相關資訊反序列化,然後根據相關資訊獲取任務所依賴的Jar包和所需檔案,完成準備工作後執行任務的run方法,實際上就是執行ShuffleMapTask或ResultTask的run方法。任務執行完畢後將結果傳送給Driver進行處理。

在Task.run方法中可以看到ShuffleMapTask和ResultTask有著不同的計算邏輯。ShuffleMapTask是將所依賴RDD的輸出寫入到ShuffleWriter中,為後面的Shuffle過程做準備。ResultTask是在所依賴RDD上應用一個函式,並返回函式的計算結果。在這兩個Task中只能看到資料的輸出方式,而看不到應有的計算邏輯。實際上計算過程是包含在RDD中的,呼叫RDD. Iterator方法獲取RDD的資料將觸發這個RDD的計算動作(RDD. Iterator),由於此RDD的計算過程中也會使用所依賴RDD的資料。從而RDD的計算過程將遞歸向上直到一個數據源型別的RDD,再遞歸向下計算每個RDD的值。需要注意的是,以上的計算過程都是在分割槽上進行的,而不是整個資料集,計算完成得到的是此分割槽上的結果,而不是最終結果。

從RDD的計算過程可以看出,RDD的計算過程是包含在RDD的依賴關係中的,只要RDD之間是連續窄依賴,那麼多個計算過程就可以在同一個Task中進行計算,中間結果可以立即被下個操作使用,而無需在程序間、節點間、磁碟上進行交換。

RDD計算過程

Shuffle過程

Shuffle是一個對資料進行分組聚合的操作過程,原資料將按照規則進行分組,然後使用一個聚合函式應用於分組上,從而產生新資料。Shuffle操作的目的是把同組資料分配到相同分割槽上,從而能夠在分割槽上進行聚合計算。為了提高Shuffle效能,還可以先在原分割槽對資料進行聚合(mapSideCombine),然後再分配部分聚合的資料到新分割槽,第三步在新分割槽上再次進行聚合。

在劃分階段時,只有遇到寬依賴才會產生新階段,才需要Shuffle操作。寬依賴與窄依賴取決於原分割槽被新分割槽的使用關係,只要一個原分割槽會被多個新分割槽使用,則為寬依賴,需要Shuffle。否則為窄依賴,不需要Shuffle。

以上也就是說只有階段與階段之間需要Shuffle,最後一個階段會輸出結果,因此不需要Shuffle。例子中的程式會產生兩個階段,第一個我們簡稱Map階段,第二個我們簡稱Reduce階段。Shuffle是通過Map階段的ShuffleMapTask與Reduce階段的ShuffledRDD配合完成的。其中ShuffleMapTask會把任務的計算結果寫入ShuffleWriter,ShuffledRDD從ShuffleReader中讀取資料,Shuffle過程會在寫入和讀取過程中完成。以HashShuffle為例,HashShuffleWriter在寫入資料時,會決定是否在原分割槽做聚合,然後根據資料的Hash值寫入相應新分割槽。HashShuffleReader再根據分割槽號取出相應資料,然後對資料進行聚合。

Spark的Shuffle過程

計算結果收集

ResultTask任務計算完成後可以得到每個分割槽的計算結果,此時需要在Driver上對結果進行彙總從而得到最終結果。

RDD在執行collect,count等動作時,會給出兩個函式,一個函式在分割槽上執行,一個函式在分割槽結果集上執行。例如collect動作在分割槽上(Executor中)執行將Iterator轉換為Array的函式,並將此函式結果返回到Driver。Driver 從多個分割槽上得到Array型別的分割槽結果集,然後在結果集上(Driver中)執行合併Array的操作,從而得到最終結果。

總結

Spark對於RDD的設計是其精髓所在。用RDD操作資料的感覺就一個字:爽!。想到RDD背後是幾噸重的大資料集,而我們隨手呼叫下map(), reduce()就可以把它轉換來轉換去,一種半兩撥千斤的感覺就會油然而生。我想是以下特性給我們帶來了這些:

  • RDD把不同來源,不同型別的資料進行了統一,使我們面對RDD的時候就會產生一種信心,就會認為這是某種型別的RDD,從而可以進行RDD的所有操作。
  • 對RDD的操作可以疊加到一起計算,我們不必擔心中間結果吞吐對效能的影響。
  • RDD提供了更豐富的資料集操作函式,這些函式大都是在MapReduce基礎上擴充的,使用起來很方便。
  • RDD為提供了一個簡潔的程式設計介面,背後複雜的分散式計算過程對開發者是透明的。從而能夠讓我們把關注點更多的放在業務上。