1. 程式人生 > >平行計算框架

平行計算框架

概念

框架與引擎

處理框架和處理引擎負責對資料系統中的資料進行計算。雖然“引擎”和“框架”之間的區別沒有什麼權威的定義,但大部分時候可以將前者定義為實際負責處理資料操作的元件,後者則可定義為承擔類似作用的一系列元件。例如Apache Hadoop可以看作一種以MapReduce作為預設處理引擎的處理框架。引擎和框架通常可以相互替換或同時使用。例如另一個框架Apache Spark可以納入Hadoop並取代MapReduce。

批處理框架

批處理主要操作大容量靜態資料集,並在計算過程完成後返回結果。批處理模式中使用的資料集特徵:

  • 有界:批處理資料集代表資料的有限集合
  • 持久:資料通常始終儲存在某種型別的持久儲存位置中
  • 大量:批處理操作通常是處理極為海量資料集的唯一方法

批處理非常適合需要訪問全套記錄才能完成的計算工作。例如在計算總數和平均數時,必須將資料集作為一個整體加以處理,而不能將其視作多條記錄的集合。這些操作要求在計算進行過程中資料維持自己的狀態。大量資料的處理需要付出大量時間,因此批處理不適合對處理時間要求較高的場合。

批處理框架應用:Apache Hadoop

流處理框架

流處理會對隨時進入系統的資料進行計算。相比批處理模式,這是一種截然不同的處理方式。流處理方式無需針對整個資料集執行操作,而是對通過系統傳輸的每個資料項執行操作。

流處理中的資料集是“無邊界”的,這就產生了幾個重要的影響:

  • 完整資料集只能代表截至目前已經進入到系統中的資料總量。
  • 工作資料集也許更相關,在特定時間只能代表某個單一資料項。
  • 處理工作是基於事件的,除非明確停止否則沒有“盡頭”。處理結果立刻可用,並會隨著新資料的抵達繼續更新。

流處理系統可以處理幾乎無限量的資料,但同一時間只能處理一條(真正的流處理)或很少量(微批處理,Micro-batch Processing)資料,不同記錄間只維持最少量的狀態。雖然大部分系統提供了用於維持某些狀態的方法,但流處理主要針對副作用更少,更加功能性的處理(Functional processing)進行優化。

功能性操作主要側重於狀態或副作用有限的離散步驟。有近實時處理需求的任務很適合使用流處理模式。分析、伺服器或應用程式錯誤日誌,以及其他基於時間的衡量指標是最適合的型別,因為對這些領域的資料變化做出響應對於業務職能來說是極為關鍵的。流處理很適合用來處理必須對變動或峰值做出響應,並且關注一段時間內變化趨勢的資料。

流處理框架應用:Apache Storm,Apache Samza

混合處理框架

一些處理框架可同時處理批處理和流處理工作負載。這些框架可以用相同或相關的元件和API處理兩種型別的資料,藉此讓不同的處理需求得以簡化。

雖然側重於某一種處理型別的專案會更好地滿足具體用例的要求,但混合框架意在提供一種資料處理的通用解決方案。這種框架不僅可以提供處理資料所需的方法,而且提供了自己的整合項、庫、工具,可勝任圖形分析、機器學習、互動式查詢等多種任務。

混合處理框架應用:Apache Spark,Apache Flink

MapReduce

MapReduce是Google提出的一個軟體架構,用於大規模資料集的並行運算。

MapReduce的處理過程分為兩個步驟:map(對映)和reduce(歸納)。每個階段的輸入輸出都是key-value的形式,型別可以自行指定。map階段對切分好的資料進行並行處理,處理結果傳輸給reduce,由reduce函式完成最後的彙總。Reduce又可以作為一個Map為下一級Reduce作準備,以此迭代。

MapReduce程序間的通訊純粹是用檔案去聯絡的,每個程序做的事情就是去讀取上一級程序生成的資料,然後處理後寫入磁碟讓下一級程序進行讀取。這個特性使得MapReduce有著良好的容錯性,當某一級的某一個程序出錯了,JobMaster會重新排程這個程序到另外一個機器上重新執行。壞處是每當Map-Reduce的某一個步驟執行完後,需要重新排程下一級任務,排程產生的開銷會非常的大(網路傳輸,檔案讀寫磁碟IO)。

MapReduce通過把對資料集的大規模操作分發給網路上的每個節點實現可靠性;每個節點會週期性的把完成的工作和狀態的更新報告回來。如果一個節點保持沉默超過一個預設的時間間隔,主節點記錄下這個節點狀態為死亡,並把分配給這個節點的資料發到別的節點。每個操作使用命名檔案的不可分割操作以確保不會發生並行執行緒間的衝突;當檔案被改名的時候,系統可能會把他們複製到任務名以外的另一個名字上去。(避免副作用)。

歸納操作工作方式很類似,但是由於歸納操作在並行能力較差,主節點會盡量把歸納操作排程在一個節點上,或者離需要操作的資料儘可能近的節點上了。

Hadoop

Apache Hadoop是一款支援資料密集型分散式應用程式的開源批處理框架,包含多個元件,即多個層,通過配合使用可處理批資料:

  • HDFS:(Hadoop Distributed File System分散式檔案系統層)可對叢集節點間的儲存和複製進行協調。HDFS確保了無法避免的節點故障發生後資料依然可用,可將其用作資料來源,可用於儲存中間態的處理結果,並可儲存計算的最終結果。
  • YARN:(Yet Another Resource Negotiator另一個資源管理器)可充當Hadoop堆疊的叢集協調元件。該元件負責協調並管理底層資源和排程作業的執行。通過充當叢集資源的介面,YARN使得使用者能在Hadoop叢集中使用比以往的迭代方式執行更多型別的工作負載。
  • MapReduce:Hadoop的原生批處理引擎。

基本處理過程

  • 從HDFS檔案系統讀取資料集
  • 將資料集拆分成小塊並分配給所有可用節點
  • 針對每個節點上的資料子集進行計算(計算的中間態結果會重新寫入HDFS)
  • 重新分配中間態結果並按照鍵進行分組
  • 通過對每個節點計算的結果進行彙總和組合對每個鍵的值進行“Reducing”
  • 將計算而來的最終結果重新寫入 HDFS

優勢和侷限

由於每個任務需要多次執行讀取和寫入操作,因此速度相對較慢。但另一方面由於磁碟空間通常是伺服器上最豐富的資源,這意味著MapReduce可以處理非常海量的資料集。同時也意味著相比其他類似技術,Hadoop的MapReduce通常可以在廉價硬體上執行,因為該技術並不需要將一切都儲存在記憶體中。MapReduce具備極高的縮放潛力,生產環境中曾經出現過包含數萬個節點的應用。與其他框架和引擎的相容與整合能力使得Hadoop可以成為使用不同技術的多種工作負載處理平臺的底層基礎。

Spark

Apache Spark是一種包含流處理能力的下一代批處理框架。與Hadoop的MapReduce引擎基於各種相同原則開發而來的Spark主要側重於通過完善的記憶體計算和處理優化機制加快批處理工作負載的執行速度。Spark可作為獨立叢集部署(需要相應儲存層的配合),或可與Hadoop整合並取代MapReduce引擎。

Spark的批處理模式

與MapReduce不同,Spark的資料處理工作全部在記憶體中進行,只在一開始將資料讀入記憶體,以及將最終結果持久儲存時需要與儲存層互動。所有中間態的處理結果均儲存在記憶體中。

雖然記憶體中處理方式可大幅改善效能,Spark在處理與磁碟有關的任務時速度也有很大提升,因為通過提前對整個任務集進行分析可以實現更完善的整體式優化。為此Spark可建立代表所需執行的全部操作,需要操作的資料,以及操作和資料之間關係的Directed Acyclic Graph(有向無環圖),即DAG,藉此處理器可以對任務進行更智慧的協調。

為了實現記憶體中批計算,Spark會使用一種名為Resilient Distributed Dataset(彈性分散式資料集),即RDD的模型來處理資料。這是一種代表資料集,只位於記憶體中,永恆不變的結構。針對RDD執行的操作可生成新的RDD。每個RDD可通過世系(Lineage)回溯至父級RDD,並最終回溯至磁碟上的資料。Spark可通過RDD在無需將每個操作的結果寫回磁碟的前提下實現容錯。

Spark的流處理模式

流處理能力是由Spark Streaming實現的。Spark本身在設計上主要面向批處理工作負載,為了彌補引擎設計和流處理工作負載特徵方面的差異,Spark實現了一種叫做微批(Micro-batch)*的概念。在具體策略方面該技術可以將資料流視作一系列非常小的“批”,藉此即可通過批處理引擎的原生語義進行處理。

Spark Streaming會以亞秒級增量對流進行緩衝,隨後這些緩衝會作為小規模的固定資料集進行批處理。這種方式的實際效果非常好,但相比真正的流處理框架在效能方面依然存在不足。

優勢和侷限

使用Spark而非Hadoop MapReduce的主要原因是速度。在記憶體計算策略和先進的DAG排程等機制的幫助下,Spark可以用更快速度處理相同的資料集。

Spark的另一個重要優勢在於多樣性。既可作為獨立叢集部署,亦可與現有Hadoop叢集整合,可執行批處理和流處理,執行一個叢集即可處理不同型別的任務。

除了引擎自身的能力外,圍繞Spark還建立了包含各種庫的生態系統,可為機器學習、互動式查詢等任務提供更好的支援。相比MapReduce,Spark任務更是“眾所周知”地易於編寫,因此可大幅提高生產力。

為流處理系統採用批處理的方法,需要對進入系統的資料進行緩衝。緩衝機制使得該技術可以處理非常大量的傳入資料,提高整體吞吐率,但等待緩衝區清空也會導致延遲增高。這意味著Spark Streaming可能不適合處理對延遲有較高要求的工作負載。

由於記憶體通常比磁碟空間更貴,因此相比基於磁碟的系統,Spark成本更高。然而處理速度的提升意味著可以更快速完成任務,在需要按照小時數為資源付費的環境中,這一特性通常可以抵消增加的成本。

Spark記憶體計算這一設計的另一個後果是,如果部署在共享的叢集中可能會遇到資源不足的問題。相比Hadoop MapReduce,Spark的資源消耗更大,可能會對需要在同一時間使用叢集的其他任務產生影響。

總結

Spark是多樣化工作負載處理任務的最佳選擇。Spark批處理能力以更高記憶體佔用為代價提供了無與倫比的速度優勢。對於重視吞吐率而非延遲的工作負載,則比較適合使用Spark Streaming作為流處理解決方案。

MPI

MPI(Message Passing Interface 訊息傳遞介面)。是一個跨語言的平行計算介面,可以被fortran,c,c++等呼叫,常在超級電腦、電腦簇等分散式記憶體環境應用。MPI的目標是高效能,大規模性,和可移植性。目前MPI的實現非常多,開源的有Open MPI和MPICH。

MPI的優點

  • 允許靜態任務排程,程式的排程是一次性的,就是比如開始申請了50個程序,那這50個程序就會一起跑,同生同死。
  • MPI的封裝,讓併發資料更操作變得非常的方便,顯示並行提供了良好的效能和移植性。
  • 由於MPI是基於訊息的,劃分計算任務,將任務對映到分散式程序集合中進行計算時,既可進行任務劃分,也可進行資料劃分,沒有任何限制。
  • 用 MPI 編寫的程式可直接在多核叢集上執行。叢集的各節點之間可以採用 MPI 程式設計模型進行程式設計,每個節點都有自己的記憶體,可以對本地的指令和資料直接進行訪問,各節點之間通過網際網路絡進行訊息傳遞。具有很好的可移植性,完備的非同步通訊功能,較強的可擴充套件性。

MPI的缺點

  • MPI都沒有提供GFS系統,這個讓大檔案的存放,讀取都成了一個問題,如果底層有一個GFS,再在上面搭一個MPI的系統,使用起來會非常的舒服。
  • MPI的容錯性一般不容易做,因為程式是同生同死的,某一個程序掛了,整個任務就掛了。
  • 並行化改進需要大量地修改原有的序列程式碼,除錯難度比較大。
  • 通訊會造成很大的開銷,為了最小化延遲,通常需要大的程式碼粒度,細粒度的並行會引發大量的通訊。
  • 動態負載平衡困難。

OpenMP

OpenMp是執行緒級別的,是針對單主機上多核/多CPU平行計算而設計的工具,支援目前所有平臺上的c,fortran等的共享記憶體式平行計算:
主執行緒(順序的執行指令)生成一系列的子執行緒,並將任務劃分給這些子執行緒進行執行。這些子執行緒並行的執行,由執行時環境將執行緒分配給不同的處理器。

OpenMp比較簡單,修改現有的大段程式碼也容易。基本上OpenMp只要在已有程式基礎上根據需要加並行語句即可。而MPI有時甚至需要從基本設計思路上重寫整個程式,除錯也困難得多,涉及到區域網通訊這一不確定的因素。不過,OpenMp雖然簡單卻只能用於單機多CPU/多核並行,MPI才是用於多主機超級計算機叢集的強悍工具,當然複雜。

CUDA

CUDA(Compute Unified Device Architecture)是一種由NVIDIA推出的通用平行計算架構,該架構使GPU能夠解決複雜的計算問題。它包含了CUDA指令集架構(ISA)以及GPU內部的平行計算引擎。

Cpu與Gpu

CPU擅長處理不規則資料結構和不可預測的存取模式,以及遞迴演算法、分支密集型程式碼和單執行緒程式。這類程式任務擁有複雜的指令排程、迴圈、分支、邏輯判斷以及執行等步驟。例如,作業系統、文書處理、互動性應用的除錯、通用計算、系統控制和虛擬化技術等系統軟體和通用應用程式等等。

GPU擅於處理規則資料結構和可預測存取模式。例如,光影處理、3D 座標變換、油氣勘探、金融分析、醫療成像、有限元、基因分析和地理資訊系統以及科學計算等方面的應用。顯示晶片通常具有更大的記憶體頻寬。具有更大量的執行單元。和高階 CPU 相比,顯示卡的價格較為低廉。

目前設計GPU+CPU架構平臺的指導思想是:讓CPU的更多資源用於快取,GPU的更多資源用於資料計算。

當代CPU的微架構是按照兼顧“指令並行執行”和“資料並行運算”的思路而設計,就是要兼顧程式執行和資料運算的並行性、通用性以及它們的平衡性。CPU的微架構偏重於程式執行的效率,不會一味追求某種運算極致速度而犧牲程式執行的效率。

GPU的微架構就是面向適合於矩陣型別的數值計算而設計的,大量重複設計的計算單元,這類計算可以分成眾多獨立的數值計算——大量數值運算的執行緒,而且資料之間沒有像程式執行的那種邏輯關聯性。

CUDA框架

CUDA 是 NVIDIA 的 GPGPU 模型,它使用 C 語言為基礎,可以直接以大多數人熟悉的 C 語言,寫出在顯示晶片上執行的程式,而不需要去學習特定的顯示晶片的指令或是特殊的結構。

從CUDA體系結構的組成來說,包含了三個部分:開發庫、執行期環境和驅動:

  • 開發庫是基於CUDA技術所提供的應用開發庫。
  • 執行期環境提供了應用開發介面和執行期元件,包括基本資料型別的定義和各類計算、型別轉換、記憶體管理、裝置訪問和執行排程等函式。
  • 驅動部分基本上可以理解為是CUDA-enable的GPU的裝置抽象層,提供硬體裝置的抽象訪問介面。
  • 應用領域例如遊戲、高清視訊、衛星成像等資料規模龐大的場景。

在 CUDA 的架構下,一個程式分為兩個部份:host 端和 device 端。Host 端是指在 CPU 上執行的部份,而 device 端則是在顯示晶片上執行的部份。Device 端的程式又稱為 “kernel”。通常 host 端程式會將資料準備好後,複製到顯示卡的記憶體中,再由顯示晶片執行 device 端程式,完成後再由 host 端程式將結果從顯示卡的記憶體中取回。

GraphLab

一般的機器學習類演算法有以下兩個特性:

  • 資料依賴性很強。運算過程中參與計算的各個機器之間經常需要交換大量的資料。
  • 流處理複雜。主要表現在整個處理過程需要反覆地迭代計算,資料處理分支很多,很難實現真正的並行。

而當前被廣泛使用的MapReduce 計算框架,Map階段叢集的各臺機器各自完成負載較重的計算過程,資料並行度高,適合完成類似矩陣運算、資料統計等資料獨立性強的計算,任務執行期間不需要相互之間進行資料通訊,所以MapReduce 不適合資料依賴性強的任務,而且MapReduce 平行計算模型也不能高效表達迭代型演算法。這種計算模型在處理如日誌分析、資料統計等資料獨立性的任務時具有明顯的優勢,但是在機器學習領域,MapReduce框架並不能很好地滿足機器學習計算任務。

另一個並行實現方案就是採用純MPI(Native MPI)的方式。純MPI實現通過精細的設計將並行任務按照MPI協議分配到叢集機器上,並根據具體應用,在計算過程中進行機器間的資料通訊和同步。純MPI的優點是,可以針對具體的應用,進行深度優化,從而達到很高的並行效能。但純MPI存在的問題是,針對不同的機器學習演算法,需要重寫其資料分配、通訊等實現細節,程式碼重用率低,機器拓展效能差,對程式設計開發人員的要求高,而且優化和除錯成本高。因而,純MPI不適合敏捷的網際網路應用。

為解決機器學習的流處理,Google提出了Pregel框架,Pregel是嚴格的BSP模型(Bulk Synchronous Parallel,整體同步平行計算模型),採用“計算-通訊-同步”的模式完成機器學習的資料同步和演算法迭代。Goolge曾稱其80%的程式使用MapReduce完成,20%的程式使用Pregel實現。因而,Pregel是很成熟的機器學習流處理框架,但Google一直沒有將Pregel的具體實現開源,外界對Pregel的模仿實現在效能和穩定性方面都未能達到工業級應用的標準。

2010年,CMU的Select實驗室提出了GraphLab框架,GraphLab 是一個基於影象處理模型的開源圖計算框架,框架使用C++語言開發實現。該框架是面向機器學習(ML)的流處理平行計算框架,可以執行在多處理機的單機系統、叢集等多種環境下。

GraphLab 自成立以來就是一個發展很迅速的開源專案,GraphLab的設計目標是,像MapReduce一樣高度抽象,可以高效執行與機器學習相關的、具有稀疏的計算依賴特性的迭代性演算法,並且保證計算過程中資料的高度一致性和高效的平行計算效能。該框架最初是為處理大規模機器學習任務而開發的,但是該框架也同樣適用於許多資料探勘方面的計算任務。在並行圖計算領域,該框架在效能上高出很多其他平行計算框架(例如,MapReduce、Mahout)幾個數量級。

GraphLab的優點

GraphLab 作為一個基於圖處理的平行計算框架,能夠高效地執行機器學習相關的資料依賴性強,迭代型演算法,其設計具有如下特點和優點。

  • 統一的API 介面。對於多核處理器和分散式環境,採用統一的API 介面,一次編寫程式即可高效地執行在共享記憶體環境或者分散式叢集上。
  • 高效能。優化C++執行引擎,在大量多執行緒操作和同步I/O 操作之間進行了很好的平衡。
  • 可伸縮性強。GraphLab 能夠智慧地選擇儲存和計算的節點,原因是GraphLab 對於資料的儲存與計算都使用了精心設計的優良演算法。
  • 整合HDFS。GraphLab 內建對HDFS 的支援,GraphLab 能夠直接從HDFS中讀資料或者將計算結果資料直接寫入到HDFS 中。
  • 功能強大的機器學習類工具集。GraphLab 在自身提供的API 介面之上實現了大量的開箱即用的工具集。

GraphLab和MapReduce的對比

GraphLab 的出現不是對MapReduce 演算法的替代,相反,GraphLab 借鑑了MapReduce 的思想,將MapReduce 平行計算模型推廣到了對資料重疊性、資料依賴性和迭代型演算法適用的領域。本質上,GraphLab 填補了高度抽象的MapReduce 平行計算模型和底層訊息傳遞、多執行緒模型(如MPI 和PThread)之間的空隙。

GraphLab 模擬了MapReduce 中的抽象過程:

  • 對MapReduce的map操作,通過稱為更新函式(Update Function)的過程進行模擬,更新函式能夠讀取和修改使用者定義的圖結構資料集。使用者提供的資料圖代表了程式在記憶體中和圖的頂點、邊相關聯的記憶體狀態,更新函式能夠遞迴地觸發更新操作,從而使更新操作作用在其他圖節點上進行動態的迭代式計算。GraphLab 提供了強大的控制原語,以保證更新函式的執行順序。
  • 對MapReduce的reduce操作,通過稱為同步操作(Sync Operation)的過程進行模擬。同步操作能夠在後臺計算任務進行的過程中執行合併(Reductions),和GraphLab 提供的更新函式一樣,同步操作能夠同時並行處理多條記錄,這也保證了同步操作能夠在大規模獨立環境下執行。

GraphLab並行框架

GraphLab將資料抽象成Graph結構,將演算法的執行過程抽象成Gather、Apply、Scatter三個步驟。其並行的核心思想是對頂點的切分。

Graph的構造

  • 頂點是其最小並行粒度和通訊粒度,邊是機器學習演算法中資料依賴性的表現方式。
  • 對於某個頂點,其被部署到多臺機器,一臺機器作為master頂點,其餘機器上作為mirror。Master作為所有mirror的管理者,負責給mirror安排具體計算任務;mirror作為該頂點在各臺機器上的代理執行者,與master資料的保持同步。
  • 對於某條邊,GraphLab將其唯一部署在某一臺機器上,而對邊關聯的頂點進行多份儲存,解了邊資料量大的問題。
  • 同一臺機器上的所有edge和vertex構成local graph,在每臺機器上,存在本地id到全域性id的對映表。
  • vertex是一個程序上所有執行緒共享的,在平行計算過程中,各個執行緒分攤程序中所有頂點的gather->apply->scatter操作。

GraphLab的執行模型

每個頂點每一輪迭代經過gather->apple->scatter三個階段。

  1. Gather階段:工作頂點的邊 (可能是所有邊,也有可能是入邊或者出邊)從領接頂點和自身收集資料,記為gather_data_i,各個邊的資料graphlab會求和,記為sum_data。這一階段對工作頂點、邊都是隻讀的。
  2. Apply階段 :Mirror將gather計算的結果sum_data傳送給master頂點,master進行彙總為total。Master利用total和上一步的頂點資料,按照業務需求進行進一步的計算,然後更新master的頂點資料,並同步mirror。Apply階段中,工作頂點可修改,邊不可修改。
  3. Scatter階段:工作頂點更新完成之後,更新邊上的資料,並通知對其有依賴的鄰結頂點更新狀態。這scatter過程中,工作頂點只讀,邊上資料可寫。

在執行模型中,graphlab通過控制三個階段的讀寫許可權來達到互斥的目的。在gather階段只讀,apply對頂點只寫,scatter對邊只寫。平行計算的同步通過master和mirror來實現,mirror相當於每個頂點對外的一個介面人,將複雜的資料通訊抽象成頂點的行為。