1. 程式人生 > 實用技巧 >Ray - Fast and Simple Distributed Computing

Ray - Fast and Simple Distributed Computing

Ray

https://ray.io/

https://github.com/ray-project/ray

(1)機器學習生態基於python語言,但是python具有全域性直譯器鎖缺點,限制了對單臺機器的多核的利用

(2)同時查大規模模型的資料的出現,需要依賴叢集來解決類似問題,引入了分散式機器學習的需求,

但是不需要引入更加高層的應用(spark)的基礎上,ray基於python生態,單程的簡單的分散式計算框架。

ray同時也包括了機器學習應用。

Ray provides a simple, universal API for building distributed applications.

Ray is packaged with the following libraries for accelerating machine learning workloads:

  • Tune: Scalable Hyperparameter Tuning
  • RLlib: Scalable Reinforcement Learning
  • RaySGD: Distributed Training Wrappers
  • Ray Serve: Scalable and Programmable Serving

https://docs.ray.io/en/latest/index.html

Ray provides a simple, universal API for building distributed applications.

Ray accomplishes this mission by:

  1. Providing simple primitives for building and running distributed applications.

  2. Enabling end users to parallelize single machine code, with little to zero code changes.

  3. Including a large ecosystem of applications, libraries, and tools on top of the core Ray to enable complex applications.

https://www.ctolib.com/topics-138457.html

傳統程式設計依賴於兩個核心概念:函式和類。使用這些構建塊就可以構建出無數的應用程式。

但是,當我們將應用程式遷移到分散式環境時,這些概念通常會發生變化。

一方面,OpenMPI、Python 多程序和 ZeroMQ 等工具提供了用於傳送和接收訊息的低階原語。這些工具非常強大,但它們提供了不同的抽象,因此要使用它們就必須從頭開始重寫單執行緒應用程式。

另一方面,我們也有一些特定領域的工具,例如用於模型訓練的 TensorFlow、用於資料處理且支援 SQL 的 Spark,以及用於流式處理的 Flink。這些工具提供了更高級別的抽象,如神經網路、資料集和流。但是,因為它們與用於序列程式設計的抽象不同,所以要使用它們也必須從頭開始重寫應用程式。

用於分散式計算的工具

Ray 佔據了一個獨特的中間地帶。它並沒有引入新的概念,而是採用了函式和類的概念,並將它們轉換為分散式的任務和 actor。Ray 可以在不做出重大修改的情況下對序列應用程式進行並行化。

來源(論文)

https://arxiv.org/abs/1703.03924

Real-Time Machine Learning: The Missing Pieces

Machine learning applications are increasingly deployed not only to serve predictions using static models, but also as tightly-integrated components of feedback loops involving dynamic, real-time decision making. These applications pose a new set of requirements, none of which are difficult to achieve in isolation, but the combination of which creates a challenge for existing distributed execution frameworks: computation with millisecond latency at high throughput, adaptive construction of arbitrary task graphs, and execution of heterogeneous kernels over diverse sets of resources. We assert that a new distributed execution framework is needed for such ML applications and propose a candidate approach with a proof-of-concept architecture that achieves a 63x performance improvement over a state-of-the-art execution framework for a representative application.

架構

https://www.cnblogs.com/fanzhidongyzby/p/7901139.html

論文給出的架構圖裡並未畫出Driver的概念,因此我在其基礎上做了一些修改和擴充。

Ray的Driver節點和和Slave節點啟動的元件幾乎相同,不過卻有以下區別:

  1. Driver上的工作程序DriverProcess一般只有一個,即使用者啟動的PythonShell。Slave可以根據需要建立多個WorkerProcess。
  2. Driver只能提交任務,卻不能接收來自全域性排程器分配的任務。Slave可以提交任務,也可以接收全域性排程器分配的任務。
  3. Driver可以主動繞過全域性排程器給Slave傳送Actor呼叫任務(此處設計是否合理尚不討論)。Slave只能接收全域性排程器分配的計算任務。

https://zhuanlan.zhihu.com/p/41875076

其中的原理是將程式碼序列化到 redis 上儲存為 object (object 可以理解為高效的不可變物件和資料共享),實現各種非同步執行和資料交換,優先在本地節點完成任務,如果完不成再由global scheduler 調配到其它節點(更正補充)。

DEMO CODE

單機版本,分散式任務示例。

remote宣告函式為一個任務。

remote呼叫會將任務分發到一個計算程序中,並執行。

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

聚類學習工作流改造

https://github.com/fanqingsong/machine_learning_workflow_on_ray

from csv import reader
from sklearn.cluster import KMeans
import joblib
import ray


ray.init()


# Load a CSV file
def load_csv(filename):
    file = open(filename, "rt")
    lines = reader(file)
    dataset = list(lines)
    return dataset

# Convert string column to float
def str_column_to_float(dataset, column):
    for row in dataset:
        row[column] = float(row[column].strip())

# Convert string column to integer
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    for i, value in enumerate(unique):
        lookup[value] = i
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup

def getRawIrisData():
    # Load iris dataset
    filename = 'iris.csv'
    dataset = load_csv(filename)
    print('Loaded data file {0} with {1} rows and {2} columns'.format(filename, len(dataset), len(dataset[0])))
    print(dataset[0])
    # convert string columns to float
    for i in range(4):
        str_column_to_float(dataset, i)
    # convert class column to int
    lookup = str_column_to_int(dataset, 4)
    print(dataset[0])
    print(lookup)

    return dataset

@ray.remote
def getTrainData():
    dataset = getRawIrisData()
    trainData = [ [one[0], one[1], one[2], one[3]] for one in dataset ]

    return trainData

@ray.remote
def getNumClusters():
    return 3

@ray.remote
def train(numClusters, trainData):
    print("numClusters=%d" % numClusters)

    model = KMeans(n_clusters=numClusters)

    model.fit(trainData)

    # save model for prediction
    joblib.dump(model, 'model.kmeans')

    return trainData

@ray.remote
def predict(irisData):
    # test saved prediction
    model = joblib.load('model.kmeans')

    # cluster result
    labels = model.predict(irisData)

    print("cluster result")
    print(labels)


def machine_learning_workflow_pipeline():
    trainData = getTrainData.remote()
    numClusters = getNumClusters.remote()
    trainData = train.remote(numClusters, trainData)
    result = predict.remote(trainData)

    result = ray.get(result)
    print("result=", result)



if __name__ == "__main__":
    machine_learning_workflow_pipeline()

Ray 破冰學習

https://github.com/anyscale/academy/blob/master/ray-crash-course/00-Ray-Crash-Course-Overview.ipynb