Mars 是什麼、能做什麼、如何做的——記 Mars 在 PyCon China 2018 上的分享
最近,在 PyCon China 2018 的北京主會場、成都和杭州分會場都分享了我們最新的工作 Mars,基於矩陣的統一計算框架。本文會以文字的形式對 PyCon 中國上的分享再進行一次闡述。
聽到 Mars,很多第一次聽說的同學都會靈魂三問:Mars 是什麼,能做什麼,怎麼做的。今天我們就會從背景,以及一個例子出發,來回答這幾個問題。
背景
首先是 scipy 技術棧的全景圖,numpy 是基礎,它提供了多維陣列的資料結構,並提供了它上面的各種計算。再往上,重要的有 scipy,主要面向各種科學計算的操作;pandas,其中核心的概念是 DataFrame,他提供對錶型別資料的處理、清洗等功能。往上一層,比較經典的庫,有 scikit-learn,它是最知名的機器學習框架之一。最上面一層,是各種垂直領域的庫,如 astropy 主要面向天文,biopython 面向生物領域等。
從 scipy 技術棧可以看出,numpy 是一個核心的地位,大量上層的庫都使用了 numpy 的資料結構和計算。
我們真實世界的資料,並不只是表這種二維型別資料那麼簡單,很多時候,我們要面對的往往是多維資料,比如我們常見的圖片處理,首先我們有圖片的個數,然後有圖片的長寬,以及 RGBA 通道,這就是四維的資料;這樣的例子不勝列舉。有這樣多維的處理能力,就有處理各種更加複雜,甚至是科學領域的能力;同時,由於多維資料本身包含二維資料,所以,我們也因此具備表型別資料的處理能力。
另外,如果我們需要探究資料的內在,光靠對錶資料進行一些統計等操作是絕對不夠的,我們需要更深層的“數學” 的方法,比如運用矩陣乘法、傅立葉變換等等的能力,來對資料進行更深層次的分析。而 numpy 由於是數值計算的庫,加上各種上層的庫,我們認為它們很適合用來提供這方面的能力。
為什麼要做 Mars,從一個例子開始
那麼,為什麼要做 Mars 這個專案呢?我們不妨從一個例子來看。
我們試圖用蒙特卡洛方法來求解 pi,蒙特卡洛方法其實很簡單,就是用隨機數的方法來解決特定的問題。如圖,這裡我們有個半徑為1的圓和邊長為2的正方形,我們生成很多隨機點的方式,通過右下角的公式,我們就可以計算出 pi 的值為 4 乘以落在圓裡點的個數除以總的點個數。隨機生成的點越多,計算出來的 pi 就越精確。
用純 Python 實現非常簡單,我們只要遍歷 N 次,生成 x 和 y 點,計算是不是落在圓內即可。執行1千萬個點,需要超過10秒的時間。
Cython 是常見加速 Python 程式碼的方式,Cython 定義了 Python 語言的超集,把這個語言翻譯到 c/c++,然後再進行編譯來加速執行。這裡,我們增加了幾個變數的型別,可以看到比純 Python 提升了 40% 的效能。
Cython 現在已經成為 Python 專案的標配,核心的 Python 三方庫基本都使用 Cython 來加速 Python 程式碼的效能。
我們這個例子裡的資料都是一個型別,我們可以想到用專門的數值計算的庫,通過向量化的方式,能極快加速這個任務的效能。numpy 就是當仁不讓的選擇了,使用 numpy,我們需要的是面向 array 的思維方式,我們應當減少使用迴圈。這裡先用 numpy.random.uniform
來生成 N*2 的一個二維陣列,然後 data ** 2
會對該數組裡的所有資料做平方操作,然後 sum(axis=1)
,會對 axis=1 也就是行方向上求和,這個時候,得到的是長度為 N 的 vector,然後我們用 numpy.sqrt
來對這個 vector 的每一個值求開方,<1 會得到一個布林值的 vector,即每個點是不是都是落在圓裡,最後接一個 sum,就可以求出來總共的點的個數。初次上手 numpy 可能會不太習慣,但是用多了以後,就會發現這種寫法的方便,它其實是非常符合直覺的。
可以看到,通過使用 numpy,我們寫出了更簡單的程式碼,但是效能確大幅提升,比純 Python 的寫法效能提升超過 10 倍。
那麼 numpy 的程式碼還能夠優化麼,答案是肯定的,我們通過一個叫 numexpr 的庫,來將 numpy 的多個操作合併成一個操作執行,來加速 numpy 的執行。
可以看到,通過 numexpr 優化的程式碼,效能比純 Python 程式碼提升超過 25 倍。
此時的程式碼執行已經相當快了,如果我們手上有 GPU,那麼我們可以利用硬體來加速任務執行。
這裡必須要安利一個庫,叫 cupy,他提供了和 numpy 一致的 API,通過簡單的 import 替換,就能讓 numpy 程式碼跑在英偉達的顯示卡之上。
這時可以看到,效能大幅提升超過 270 倍。真的非常誇張了。
為了讓蒙特卡洛方法計算的結果更加精確,我們把計算量擴大 1000 倍。會碰到什麼情況呢?
沒錯,這就是大家不時碰到的,OutOfMemory,記憶體溢位。更慘的是,在 jupyter 裡,有時候記憶體溢位導致程序被殺,甚至會導致之前跑的全部結果都丟失。
蒙特卡洛方法還是比較容易處理的,我把問題分解成 1000 個,每個求解1千萬資料就好了嘛,寫個迴圈,做個彙總。但此時,整個計算的時間來到了12分鐘多,太慢了。
此時我們會發現,整個執行過程中,其實只有一個 CPU 在幹活,其他核都在原地吆喝。那麼,我們怎麼讓 numpy 並行化呢?
首先,numpy 裡有一些操作是能並行的,比如 tensordot 來做矩陣乘法,其他大部分操作都不能利用多核。那麼,要將 numpy 並行化,我們可以:
- 採用多執行緒/多程序編寫任務
- 分散式化
蒙特卡洛方法算 pi 改寫成多執行緒和多程序實現還是非常容易的,我們寫一個函式來處理1千萬的資料,我們把這個函式通過 concurrent.futures 的 ThreadPoolExecutor 和 ProcessPoolExecutor 來分別提交函式 1000 遍用多執行緒和多程序執行即可。可以看到效能能提升到 2倍和3倍。
但是呢,蒙特卡洛求解 pi 本身就很容易手寫並行,考慮更復雜的情況。
import numpy as np
a = np.random.rand(100000, 100000)
(a.dot(a.T) - a).std()
這裡建立了 10萬*10萬的矩陣 a,輸入就有大概 75G,我們拿 a 矩陣乘 a 的轉置,再減去 a 本身,最終求標準差。這個任務的輸入資料就很難塞進記憶體,後續的手寫並行就更加困難。
這裡問題就引出來了,我們需要什麼樣框架呢?
- 提供熟悉的介面,像 cupy 這樣,通過簡單的 import 替換,就能讓原來 numpy 寫的程式碼並行。
- 具備可擴充套件性。小到單機,也可以利用多核並行;大到一個很大的叢集,支援上千臺機器的規模來一起分散式處理任務。
- 支援硬體加速,支援用 GPU 等硬體來加速任務執行。
- 支援各種優化,比如操作合併,能利用到一些庫來加速執行合併的操作。
- 我們雖然是記憶體計算的,但不希望單機或者叢集記憶體不足,任務就會失敗。我們應當讓暫時用不到的資料 spill 到磁碟等等儲存,來保證即使記憶體不夠,也能完成整個計算。
Mars 是什麼,能做什麼事
Mars 就是這樣一個框架,它的目標就是解決這幾個問題。目前 Mars 包括了 tensor :分散式的多維矩陣計算。
100億大小的蒙特卡洛求解 pi的問題規模是 150G,它會導致 OOM。通過 Mars tensor API,只需要將 import numpy as np
替換成 import mars.tensor as mt
,後續的計算完全一致。不過有一個不同,mars tensor 需要通過 execute
觸發執行,這樣做的好處是能夠對整個中間過程做盡量多的優化,比如操作合併等等。不過這種方式對 debug 不太友好,後續我們會提供 eager mode,來對每一步操作都觸發計算,這樣就和 numpy 程式碼完全一致了。
可以看到這個計算時間和手寫並行時間相當,峰值記憶體使用也只是 1個多G,因此可以看到 Mars tensor 既能充分並行,又能節省記憶體的使用 。
目前,Mars 實現了 70% 的常見 numpy 介面,完整列表見 這裡。我們一致在努力提供更多 numpy 和 scipy 的介面,我們最近剛剛完成了對逆矩陣計算的支援。
Mars tensor 也提供了對 GPU 和稀疏矩陣的支援。eye
是建立單位對角矩陣,它只有對角線上有值為1,如果用稠密的方式儲存會浪費儲存。不過目前,Mars tensor 還只支援二維稀疏矩陣。
Mars 怎麼做到並行和更省記憶體
和所有的 dataflow 的框架一樣,Mars 本身也有計算圖的概念,不同的是,Mars 包含粗粒度圖和細粒度圖的概念,使用者寫的程式碼在客戶端生成粗粒度圖,在提交到服務端後,會有 tile 的過程,將粗粒度圖 tile 成細粒度圖,然後我們會排程細粒度圖執行。
這裡,使用者寫下的程式碼,在記憶體裡會表達成 Tensor 和 Operand 構成的粗粒度圖。
當用戶呼叫 execute
方法時,粗粒度的圖會被序列化到服務端,反序列化後,我們會把這個圖 tile 成細粒度圖。對於輸入 10002000 的矩陣,假設指定每個維度上的 chunk 大小都是 500,那它會被 tile 成 24 一共 8 個chunk。
後續,我們會對每個我們實現的 operand 也就是運算元提供 tile 的操作,將一個粗粒度的圖 tile 成細粒度圖。這時,我們可以看到,在單機,如果有8個核,那麼我們就可以並行執行整個細粒度圖;另外給定 1/8 大小的記憶體,我們就可以完成整個圖的計算。
不過,我們在真正執行前,會對整個圖進行 fuse 也就是操作合併的優化,這裡的三個操作真正執行的時候,會被合併成一個運算元。針對執行目標的不同,我們會使用 numexpr 和 cupy 的 fuse 支援來分別對 CPU 和 GPU 進行操作合併執行。
上面的例子,都是我們造出來很容易並行的任務。如我們先前提到的例子,通過 tile 之後生成的細粒度圖其實是非常複雜的。真實世界的計算場景,這樣的任務其實是很多的。
為了將這些複雜的細粒度圖能夠充分排程執行,我們必須要滿足一些基本的準則,才能讓執行足夠高效。
首先,初始節點的分配非常重要。比如上圖,假設我們有兩個 worker,如果我們把 1和3 分配到一個 worker,而將 2和4 分配到另一個 worker,這時當 5 或者 6 被排程的時候,他們就需要觸發遠端資料拉取,這樣執行效率會大打折扣。如果我們一開始將 1和2 分配到一個 worker,將 3和4 分配到另一個 worker,這時執行就會非常高效。初始節點的分配對整體的執行影響是很大的,這就需要我們對整個細粒度的圖有個全域性的掌握,我們才能做到比較好的初始節點分配。
另外,深度優先執行的策略也是相當重要的。假設這時,我們只有一個 worker,執行完 1和2 後,我們排程了 3 的話,就會導致 1和2 的記憶體不能釋放,因為 5 此時還沒有被觸發執行。但是,如果我們執行完 1和2 後,排程了 5 執行,那麼當 5 執行完後,1和2 的記憶體就可以釋放,這樣整個執行過程中的記憶體就會是最省的。
所以,初始節點分配,以及深度優先執行是兩個最基本的準則,光有這兩點是遠遠不夠的,mars 的整個執行排程中有很多具有挑戰的任務,這也是我們需要長期優化的物件。
Mars 分散式
所以,Mars 本質上其實是一個細粒度的,異構圖的排程系統。我們把細粒度的運算元排程到各個機器上,在真正執行的時候其實是呼叫 numpy、cupy、numexpr 等等的庫。我們充分利用了成熟的、高度優化的單機庫,而不是重複在這些領域造輪子。
在這個過程中,我們會遇到一些難點:
- 因為我們是 master-slave 架構,我們 master 如何避免單點?
- 我們的 worker 如何避免 Python 的 GIL(全域性直譯器鎖)的限制?
- Master 的控制邏輯交錯複雜,我們很容易寫出來高耦合的,又臭又長的程式碼,我們如何將程式碼解耦?
我們的解法是使用 Actor model。Actor模型定義了並行的方式,也就是一切皆 Actor,每個 Actor 維護一個內部狀態,它們都持有郵箱,Actor 之間通過訊息傳遞,訊息收到會放在郵箱中,Actor 從郵箱中取訊息進行處理,一個 Actor 同時只能處理一個訊息。Actor 就是一個最小的並行單元,由於一個 Actor 同時只能處理一個訊息,你完全不需要擔心併發的問題,併發應當是 Actor 框架來處理的。而所有 Actor 是不是在同一臺機器上,這在 Actor 模型裡也變得不重要,Actor 在不同機器上,只要能完成訊息的傳遞就可以了,這樣 Actor 模型也天然支援分散式系統。
因為 Actor 是最小的並行單元,我們在寫程式碼的時候,可以將整個系統分解成很多 Actor,每個 Actor 是單一職責的,這有點類似面向物件的思想,這樣讓我們的程式碼得以解耦。
另外,Master 解耦成 Actor 之後,我們可以讓這些 Actor 分佈在不同的機器上,這樣就讓 Master 不再成為單點。同時,我們讓這些 Actor 根據一致性雜湊來進行分配,後續如果有 scheduler 機器掛掉, Actor 可以根據一致性雜湊重新分配並重新建立來達到容錯的目的。
最後,我們的 actors 是跑在多程序上的,每個程序裡是很多的協程,這樣,我們的 worker 也不會受到 GIL 的限制。
像 Scala 或者 Java 這些 JVM 語言 可以使用 akka 這個 Actor 框架,對於 Python 來說,我們並沒有什麼標準做法,我們認為我們只是需要一個輕量的 Actor 框架就可以滿足我們使用,我們不需要 akka 裡面一些高階的功能。因此,我們開發了 Mars actors,一個輕量的 Actor 框架,我們 Mars 整個分散式的 schedulers 和 workers 都在 Mars actors 層之上。
這是我們 Mars actors 的架構圖,在啟動 Actor pool 的時候,我們子程序會根據併發啟動若干子程序。主程序上有 socket handler 來接受遠端 socket 連線傳遞訊息,另外主程序有個 Dispatcher 物件,用來根據訊息的目的地來進行分發。我們所有的 Actor 都在子程序上建立,當 Actor 收到一個訊息處理時,我們會通過協程呼叫 Actor.on_receive(message)
方法。
一個 Actor 傳送訊息到另一個 Actor,分三種情況。
- 它們在同一個程序,那麼直接通過協程呼叫即可。
- 它們在一臺機器不同程序,這個訊息會被序列化通過管道送到主程序的 Dispatcher,dispatcher 通過解開二進位制的頭部資訊得到目標的程序 ID,通過對應的管道送到對應子程序,子程序通過協程觸發相應 Actor 的訊息處理即可。
- 它們在不同機器,那麼當前子程序會通過 socket 把序列化的訊息傳送到對應機器的主程序,該機器再通過 Dispatcher 把訊息送到對應子程序。
由於使用協程作為子程序內的並行方式,而協程本身在 IO 處理上有很強的效能,所以,我們的 Actor 框架在 IO 方面也會有很好的效能。
上圖是裸用 Mars actors 來求解蒙特卡洛方法算 pi。這裡定義兩個 Actor,一個 Actor 是 ChunkInside,它接受一個 chunk 的大小,來計算落在圓內點的個數;另外一個 Actor 是 PiCaculator,它負責接受總的點個數,來建立 ChunkInside,這個例子就是直接建立 1000 個 ChunkInside,然後通過傳送訊息來觸發他們計算。create_actor
時指定 address 可以讓 Actor 分配在不同的機器上。
這裡可以看到,我們裸用 Mars actors 的效能是要快過多程序版本的。
這裡我們總結一下,通過使用 Mars actors,我們能不受 GIL 限制,編寫分散式程式碼變得非常容易,它讓我們 IO 變得高效,此外,因為 Actor 解耦,程式碼也變得更容易維護。
現在讓我們看下 Mars 分散式的完整執行過程。現在有1個 client,3個 scheduler 和 5個worker。使用者建立一個 session,在服務端會建立一個 SessionActor 物件,通過一致性雜湊,分配到 scheduler1 上。此時,使用者運行了一個 tensor,首先 SessionActor 會建立一個 GraphActor,它會 tile 粗粒度圖,圖上假設有三個節點,則會建立三個 OperandActor,分別分配到不同的 scheduler 上。每個 OperandActor 會控制 operand 的提交、任務狀態的監督和記憶體的釋放等操作。此時 1 和 2 的 OperandActor 發現沒有依賴,並且叢集資源充足,那麼他們會把任務提交到相應的 worker 執行,在執行完成後,向 3 通知任務完成,3 發現 1和2 都執行完成後,因為資料在不同 worker 執行,決定好執行 worker 後,先觸發資料拉取操作,然後再執行。客戶端這邊,通過輪詢 GraphActor 得知任務完成,則會觸發資料拉取到本地的操作。整個任務就完成了。
我們對 Mars 分散式做了兩個 benchmark,第一個是對 36 億資料的每個元素加一再乘以2,圖中紅色叉是 numpy 的執行時間,可以看到,我們比 numpy 有數倍提升,藍色的虛線是理論執行時間,可以看到我們真實加速非常接近理論時間加速。第二個 benchmark,我們增加了資料量,來到 144 億資料,對每個元素加1乘以2後,再求和,可以看到單機 numpy 已經不能完成任務了,此時,針對這個任務,我們也可以取得不錯的加速比。
未來計劃
Mars 已經在 Github 上原始碼,讓更多同學來一起參與共建 Mars:https://github.com/mars-project/mars 。
在後續 Mars 的開發計劃上,如上文說,我們會支援 eager mode,讓每一步觸發執行,提升對效能不敏感的任務開發以及 debug 時的使用體驗;我們會支援更多 numpy 和 scipy 介面;後續很重要的一個是,我們會提供 100% 相容 pandas 的介面,由於利用了 mars tensor 作為基礎,我們也可以提供 GPU 的支援;我們會提供相容 scikit-learn 的機器學習的支援;我們還會提供在細粒度圖上排程自定義函式和自定義類的功能,增強靈活性;最後,因為我們客戶端其實並不依賴 Python,任意語言都可以序列化粗粒度圖,所以我們完全可以提供多語言的客戶端版本,不過這點,我們會視需求而定。
總之,開源對我們是很重要的,龐大的 scipy 技術棧的並行化,光靠我們的力量是不夠的,需要大家來一起幫我們來共建。
現場圖片
最後再 po 一點現場圖片吧,現場觀眾對 Mars 的問題還是蠻多的。我大致總結下:
- Mars 在一些特定計算的效能,比如 SVD 分解,這裡我們有和使用者合作專案的一些測試資料,輸入資料是 8億*32的矩陣做 SVD 分解,分解完再矩陣乘起來和原矩陣做對比,這整個計算過程使用 100個 worker(8核),用7分鐘時間算完
- Mars 何時開源,我們已經開源:https://github.com/mars-project/mars
- Mars 開源後會不會閉源,答:不會
- Mars actors 的詳細工作原理
- Mars 是靜態圖還是動態圖,目前是靜態圖,eager mode 做完後可以支援動態圖
- Mars 會不會涉及深度學習,答:目前不會