1. 程式人生 > >機器學習框架Ray——Actor模型


1. Actors

Ray中的遠端函式被認為是功能性強和副作用低的框架。 僅限於遠端函式的情況下,可以為我們提供分散式函式程式設計,這對於許多使用情況非常有用,但在實踐中會受到一些限制。

Ray通過actor擴充套件了資料流模型。 一個actor本質上是一個有狀態的worker(或service)。 當一個新的actor被例項化時,一個新的worker被建立,並且該actor的方法被安排在該特定的worker上,並且可以訪問和改變該worker的狀態。


import ray

2. 定義和建立一個actor

考慮下面的簡單例子。裝飾器 ray.remote表明類Counter例項化後為actor。

class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value


a1 = Counter.remote()
a2 = Counter.remote()


  1. 選擇叢集中的一個節點,並在該節點上(由該節點上的本地排程程式)建立一個worker,以便執行在該actor上呼叫的方法。
  2. 在該worker上建立一個Counter物件,並執行Counter建構函式。

3. 使用actor


a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1


  1. 任務被建立。
  2. 該任務被直接分配給負責該actor的本地排程程式(由驅動程式的本地排程器控制)。 因此,此排程過程繞過全域性排程程式。
  3. 一個物件ID被返回。


同樣,對a2.increment.remote()的呼叫將產生一個任務,該任務被排程到第二個Counter actor上。 由於這兩個任務在不同的actor上執行,它們可以並行執行(請注意,只有該actor上的方法才被排程分配到actor worker上,常規的遠端函式不會)。

另一方面,在同一個Counter actor上呼叫的方法按照它們被呼叫的順序依次執行。 因此它們可以相互共享狀態,如下所示。

# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]

4. 一個更加有趣的actor示例


Gym為許多模擬環境提供了一個介面,用於測試和訓練強化學習的agent。 這些模擬器是有狀態的,使用這些模擬器的任務肯定會改變它們的狀態。 我們可以使用actor來封裝這些模擬器的狀態。

import gym

class GymEnvironment(object):
    def __init__(self, name):
        self.env = gym.make(name)

    def step(self, action):
        return self.env.step(action)

    def reset(self):


pong = GymEnvironment.remote("Pong-v0")
pong.step.remote(0)  # Take action 0 in the simulator.

5. 在actor上使用GPU

一個常見的用例是actor包含一個神經網路。 例如,假設我們已經匯入了Tensorflow並建立了一個構建神經網路的方法。

import tensorflow as tf

def construct_network():
    x = tf.placeholder(tf.float32, [None, 784])
    y_ = tf.placeholder(tf.float32, [None, 10])

    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)

    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    return x, y_, train_step, accuracy


import os

# Define an actor that runs on GPUs. If there are no GPUs, then simply use
# ray.remote without any arguments and no parentheses.
class NeuralNetOnGPU(object):
    def __init__(self):
        # Set an environment variable to tell TensorFlow which GPUs to use. Note
        # that this must be done before the call to tf.Session.
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])
        with tf.Graph().as_default():
            with tf.device("/gpu:0"):
                self.x, self.y_, self.train_step, self.accuracy = construct_network()
                # Allow this to run on CPUs if there aren't any GPUs.
                config = tf.ConfigProto(allow_soft_placement=True)
                self.sess = tf.Session(config=config)
                # Initialize the network.
                init = tf.global_variables_initializer()

為了表明一個actor需要用到GPU,我們將num_gpus = 1傳遞給ray.remote。 請注意,為了實現這一點,Ray必須初始化時指定使用GPU,例如,通過ray.init(num_gpus = 2)。 否則,當你嘗試使用NeuralNetOnGPU.remote()例項化GPU版本時,會引發異常,說明系統中沒有足夠的GPU。

當actor建立時,它將有權通過ray.get_gpu_ids()得到可以使用的GPU的ID的列表。 這是一個整數列表,如[]或[1]或[2,5,6]。 由於我們傳入了ray.remote(num_gpus = 1),因此此列表將具有一個長度。


import os
import ray
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data


def construct_network():
    x = tf.placeholder(tf.float32, [None, 784])
    y_ = tf.placeholder(tf.float32, [None, 10])

    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)

    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    return x, y_, train_step, accuracy

class NeuralNetOnGPU(object):
    def __init__(self, mnist_data):
        self.mnist = mnist_data
        # Set an environment variable to tell TensorFlow which GPUs to use. Note
        # that this must be done before the call to tf.Session.
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])
        with tf.Graph().as_default():
            with tf.device("/gpu:0"):
                self.x, self.y_, self.train_step, self.accuracy = construct_network()
                # Allow this to run on CPUs if there aren't any GPUs.
                config = tf.ConfigProto(allow_soft_placement=True)
                self.sess = tf.Session(config=config)
                # Initialize the network.
                init = tf.global_variables_initializer()

    def train(self, num_steps):
        for _ in range(num_steps):
            batch_xs, batch_ys = self.mnist.train.next_batch(100)
            self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})

    def get_accuracy(self):
        return self.sess.run(self.accuracy, feed_dict={self.x: self.mnist.test.images,
                                                       self.y_: self.mnist.test.labels})

# Load the MNIST dataset and tell Ray how to serialize the custom classes.
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)

# Create the actor.
nn = NeuralNetOnGPU.remote(mnist)

# Run a few steps of training and print the accuracy.
accuracy = ray.get(nn.get_accuracy.remote())
print("Accuracy is {}.".format(accuracy))

6. 圍繞actor控制代碼傳遞

Actor控制代碼可以傳遞到其他任務。 為了用一個簡單的例子來說明這一點,考慮一個簡單的actor定義。 此功能目前是實驗性的,並受以下所述的限制。

class Counter(object):
    def __init__(self):
        self.counter = 0

    def inc(self):
        self.counter += 1

    def get_counter(self):
        return self.counter


def f(counter):
    while True:


counter = Counter.remote()

# Start some tasks that use the actor.
[f.remote(counter) for _ in range(4)]

# Print the counter value.
for _ in range(10):

7. 當前actor的限制


  1. Actor生命週期管理:當前,當原始actor的控制代碼超出範圍時,會為該actor安排一個任務來殺死actor程序。這會產生一個問題,如果最初的actor控制代碼超出了作用域,但actor仍然被已經傳遞了actor控制代碼的任務使用。
  2. 返回actor控制代碼:Actor控制代碼當前不能從遠端函式或actor方法返回。同樣,ray.put不能在actor控制代碼上呼叫。如上,控制代碼counter由Counter.remote()在本地呼叫,而不是從遠端函式返回。
  3. 重建被丟棄的actor物件:如果在由actor方法建立的已丟棄物件上呼叫ray.get,則Ray當前不會重建該物件。
  4. 確定性重建丟失的actor:如果actor由於節點故障而丟失,則根據初始執行的順序在新節點上重構actor。然而,同時安排在actor身上的新任務可能會在重新執行的任務之間執行。如果你的應用程式對狀態一致性有嚴格的要求,這可能會成為問題。



