漫談分散式計算框架
如果問 mapreduce 和 spark 什麼關係,或者說有什麼共同屬性,你可能會回答他們都是大資料處理引擎。如果問 spark 與 tensorflow 呢,就可能有點迷糊,這倆關注的領域不太一樣啊。但是再問 spark 與 MPI 呢?這個就更遠了。雖然這樣問多少有些不嚴謹,但是它們都有共同的一部分,這就是我們今天談論的一個話題,一個比較大的話題:分散式計算框架。
不管是 mapreduce,還是 spark 亦或 tensorflow,它們都是利用分散式的能力,執行某些計算,解決一些特定的問題。從這個 level 講,它們都定義了一種“分散式計算模型”,即提出了一種計算的方法,通過這種計算方法,就能夠解決大量資料的分散式計算問題。它們的區別在於提出的分散式計算模型不同。Mapreduce 正如其名,是一個很基本的 map-reduce 式的計算模型(好像沒說一樣)。Spark 定義了一套 RDD 模型,本質上是一系列的 map/reduce 組成的一個 DAG 圖。Tensorflow 的計算模型也是一張圖,但是 tensorflow 的圖比起 spark 來,顯得更“複雜”一點。你需要為圖中的每個節點和邊作出定義。根據這些定義,可以指導 tensorflow 如何計算這張圖。Tensorflow 的這種具體化的定義使它比較適合處理特定型別的的計算,對 tensorflow 來講就是神經網路。而 spark 的 RDD 模型使它比較適合那種沒有相互關聯的的資料並行任務。那麼有沒有一種通用的、簡單的、效能還高的分散式計算模型?我覺著挺難。通用往往意味著效能不能針對具體情形作出優化。而為專門任務寫的分散式任務又做不到通用,當然也做不到簡單。
插一句題外話,分散式計算模型有一塊伴隨的內容,就是排程。雖然不怎麼受關注,但這是分散式計算引擎必備的東西。mapreduce 的排程是 yarn,spark 的排程有自己內嵌的排程器,tensorflow 也一樣。MPI 呢?它的排程就是幾乎沒有排程,一切假設叢集有資源,靠 ssh 把所有任務拉起來。排程實際上應當分為資源排程器和任務排程器。前者用於向一些資源管理者申請一些硬體資源,後者用於將計算圖中的任務下發到這些遠端資源進行計算,其實也就是所謂的兩階段排程。近年來有一些 TensorflowOnSpark 之類的專案。這類專案的本質實際上是用 spark 的資源排程,加上 tensorflow 的計算模型。
當我們寫完一個單機程式,而面臨資料量上的問題的時候,一個自然的想法就是,我能不能讓它執行在分散式的環境中?如果能夠不加改動或稍加改動就能讓它分散式化,那就太好了。當然現實是比較殘酷的。通常情況下,對於一個一般性的程式,使用者需要自己手動編寫它的分散式版本,利用比如 MPI 之類的框架,自己控制資料的分發、彙總,自己對任務的失敗做容災(通常沒有容災)。如果要處理的目標是恰好是對一批資料進行批量化處理,那麼 可以用 mapreduce 或者 spark 預定義的 api。對於這一類任務,計算框架已經幫我們把業務之外的部分(腳手架程式碼)做好了。同樣的,如果我們的任務是訓練一個神經網路,那麼用 tensorflow pytorch 之類的框架就好了。這段話的意思是,如果你要處理的問題已經有了對應框架,那麼拿來用就好了。但是如果沒有呢?除了自己實現之外有沒有什麼別的辦法呢?
今天注意到一個專案,Ray,聲稱你只需要稍微修改一下你的程式碼,就能讓它變為分散式的(實際上這個專案早就釋出了,只是一直沒有刻意關注它)。當然這個程式碼僅侷限於 python,比如下面這個例子,
| **Basic Python** | **Distributed with Ray** |
+------------------------------------------------+----------------------------------------------------+
| | |
| # Execute f serially. | # Execute f in parallel. |
| | |
| | @ray.remote |
| def f(): | def f(): |
| time.sleep(1) | time.sleep(1) |
| return 1 | return 1 |
| | |
| | |
| | ray.init() |
| results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) |
+------------------------------------------------+----------------------------------------------------+
這麼簡單?這樣筆者想到了 openmp(注意不是 openmpi)。來看看,
#include<iostream>
#include"omp.h"
using namespace std;
void main() {
#pragma omp parallel for
for(int i = 0; i < 10; ++i) {
cout << "Test" << endl;
}
system("pause");
}
把標頭檔案匯入,新增一行預處理指令就可以了,這段程式碼立馬變為並行執行。當然 openmp 不是分散式,只是藉助編譯器將程式碼中需要並行化的部分編譯為多執行緒執行,本身還是一個程序,因此其並行度收到 CPU 執行緒數量所限。如果 CPU 是雙執行緒,那隻能 2 倍加速。在一些伺服器上,CPU 可以是單核 32 執行緒,自然能夠享受到 32 倍加速(被並行化的部分)。不過這些都不重要,在使用者看來,Ray 的這個做法和 openmp 是不是有幾分相似之處?你不需要做過多的程式碼改動,就能將程式碼變為分散式執行(當然 openmp 要更絕一點,因為對於不支援 openmp 的編譯器它就是一行註釋而已)。
那麼 Ray 是怎麼做到這一點的呢?其實 Ray 的做法說起來也比較簡單,就是定義了一些 API,類似於 MPI 中的定義的通訊原語。使用的時候,將這些 API “注入”到程式碼合適的位置,那麼程式碼就變成了使用者程式碼夾雜著一些 Ray 框架層的 API 呼叫,整個程式碼實際上就形成了一張計算圖。接下來的事情就是等待 Ray 把這張計算圖完成返回就好了。Ray 的論文給了個例子:
@ray.remote
def create_policy():
# Initialize the policy randomly.
return policy
@ray.remote(num_gpus=1)
class Simulator(object):
def __init__(self):
# Initialize the environment.
self.env = Environment()
def rollout(self, policy, num_steps):
observations = []
observation = self.env.current_state()
for _ in range(num_steps):
action = policy(observation)
observation = self.env.step(action)
observations.append(observation)
return observations
@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
# Update the policy.
return policy
@ray.remote
def train_policy():
# Create a policy.
policy_id = create_policy.remote()
# Create 10 actors.
simulators = [Simulator.remote() for _ in range(10)]
# Do 100 steps of training.
for _ in range(100):
# Perform one rollout on each actor.
rollout_ids = [s.rollout.remote(policy_id)
for s in simulators]
# Update the policy with the rollouts.
policy_id = update_policy.remote(policy_id, *rollout_ids)
return ray.get(policy_id)
生成的計算圖為
所以,使用者要做的事情,就是在自己的程式碼里加入適當的 Ray API 呼叫,然後自己的程式碼就實際上變成了一張分散式計算圖了。作為對比,我們再來看看 tensorflow 對圖的定義,
import tensorflow as tf
# 建立資料流圖:y = W * x + b,其中W和b為儲存節點,x為資料節點。
x = tf.placeholder(tf.float32)
W = tf.Variable(1.0)
b = tf.Variable(1.0)
y = W * x + b
with tf.Session() as sess:
tf.global_variables_initializer().run() # Operation.run
fetch = y.eval(feed_dict={x: 3.0}) # Tensor.eval
print(fetch) # fetch = 1.0 * 3.0 + 1.0
'''
輸出:
4.0
'''
可以看出,tensorflow 中是自己需要自己顯式的、明確的定義出圖的節點,placeholder Variable 等等(這些都是圖節點的具體型別),而 Ray 中圖是以一種隱式的方式定義的。我認為後者是一種更自然的方式,站在開發者的角度看問題,而前者更像是為了使用 tensorflow 把自己程式碼邏輯去適配這個輪子。
那麼 ray 是不是就我們要尋找的那個即通用、又簡單、還靈活的分散式計算框架呢?由於筆者沒有太多的 ray 的使用經驗,這個問題不太好說。從官方介紹來看,有限的幾個 API 確實是足夠簡單的。僅靠這幾個 API 能不能達成通用且靈活的目的還不好講。本質上來說,Tensorflow 對圖的定義也足夠 General,但是它並不是一個通用的分散式計算框架。由於某些問題不在於框架,而在於問題本身的分散式化就存在困難,所以試圖尋求一種通用分散式計算框架解決單機問題可能是個偽命題。
話扯遠了。假設 ray 能夠讓我們以一種比較容易的方式分散式地執行程式,那麼會怎麼樣呢?前不久 Databricks 開源了一個新專案,Koalas,試圖以 RDD 的框架並行化 pandas。由於 pandas 的場景是資料分析,和 spark 面對的場景類似,兩者的底層儲存結構、概念也是很相似的,因此用 RDD 來分散式化 pandas 也是可行的。我想,如果 ray 足夠簡單好用,在 pandas 里加一些 ray 的 api 呼叫花費的時間精力可能會遠遠小於開發一套 koalas。但是在 pandas 里加 ray 就把 pandas 繫結到了 ray 上,即便單機也是這樣,因為 ray 做不到像 openmp 那樣如果支援,很好,不支援也不影響程式碼執行。
囉嗦這麼多,其實就想從這麼多引擎的細節中跳出來,思考一下到底什麼是分散式計算框架,每種框架又是設計的,解決什麼問題,有什麼優缺點。最後拿大佬的一個觀點結束本文。David Patterson 在演講 “New Golden Age For Computer Architecture” 中提到,通用硬體越來越逼近極限,要想要達到更高的效率,我們需要設計面向領域的架構(Domain Specific Architectures)。這是一個計算架構層出不窮的時代,每種架構都是為了解決其面對的領域問題出現的,必然包含對其問題的特殊優化。通用性不是使用者解決問題的出發點,而更多的是框架設計者的“一廂情願”,使用者關注的永遠是領域問題。從這個意義上講,面向領域的計算架構應該才是正確的方向。
宣告:限於本人水平有限,文中陳述內容可能有誤。歡迎批評指正。
作者:EMR
本文為雲棲社群原創內容,未經