[原始碼解析] TensorFlow 分散式環境(1) --- 總體架構
[原始碼解析] TensorFlow 分散式環境(1) --- 總體架構
目錄在具體介紹 TensorFlow 分散式的各種 Strategy 之前,我們首先需要看看分散式的基礎:分散式環境。只有把基礎打紮實了,才能在以後的分析工作之中最大程度的掃清障礙,事半功倍。
本文程式碼使用的部分 API 不是最新,但因為我們的目的是瞭解其設計思想,舊的 API 反而會更加清晰(目前業界很多公司也依然基於較低版本的 TensroFlow,所以舊 API 也有相當的分析意義)。
這裡強烈推薦兩個大神:
-
[TensorFlow Internals] (
-
https://home.cnblogs.com/u/deep-learning-stacks/ 西門宇少,不僅僅是 TensorFlow,其公共號還有更多其他領域,業界前沿。
本系列其他文章是:
[翻譯] TensorFlow 分散式之論文篇 "Implementation of Control Flow in TensorFlow"
1. 總體架構
我們從幾個不同角度來對分散式模式進行拆分,如何劃分不是絕對的,這些角度也不是正交的,可能會彼此有部分包含,這麼劃分只是筆者覺得更容易從這些方面理解。
1.1 叢集角度
1.1.1 概念
我們首先從叢集和業務邏輯角度來拆分如下,有術語如下:
-
Cluster:TensorFlow 叢集定義。
- 一個 TensorFlow 叢集包含一個或者多個 TensorFlow 服務端,一個叢集一般會專注於一個相對高層的目標,比如用多臺機器並行地訓練一個神經網路。
- 訓練被切分為一系列 job,每個 job又會負責一系列 tasks。當叢集有多個 task 時候,需要使用tf.train.ClusterSpec 來指定每一個任務的機器。
-
Job:一個 job 包含一系列致力於完成某個相同目標的 task,一個 job 中的 tasks 通常會執行在不同的機器中。一般存在兩種 job:
- ps job:ps 是 parameter server 的縮寫,其負責處理儲存/更新變數相關的工作。
- worker job:用於承載那些計算密集型的無狀態節點,負責資料計算。
-
Task:一個 Task 會完成一個具體任務,一般會關聯到某個 TensorFlow 服務端的處理過程。
- Task 屬於一個特定的 job 並且在該 job 的任務列表中有唯一的索引 task_index。
- Task 通常與一個具體的 tf.train.Server 相關聯,執行在獨立的程序中。
- 可以在一個機器上執行一個或者多個 Task,比如單機多 GPU。
1.1.2 示意圖
我們給出以上三者的關係如下,Cluster 包含多個 Job,Job 包括 1 到多個 Task:
圖 1 角色之間關係
對於 Job 兩種角色,我們給出一幅經典的引數伺服器示意圖如下,下圖上方就是執行的 ps 叢集,中間運行了四個 worker。
圖 2 引數伺服器.
來源:"A Survey on Distributed Machine Learning"
1.1.3 建立
我們看看用低階 API 如何實現分散式訓練。
1.1.3.1 建立叢集
我們首先建立叢集,叢集包括兩種角色,引數伺服器 ps job 有三個任務(task),worker job 有兩個 task。這裡每一個 task 是一個機器,也可以在同一個機器之上執行多個 task(比如每個 task 控制不同的 GPU 裝置)。
ClusterSpec 以 Job 的方式組織,指定了叢集中 Task 如何部署,因為一個 Task 對應了一個程序,所以ClusterSpec 也描述了 TensorFlow 分散式執行時之中程序如何分佈。
ps_hosts = ["1.1.1.1:11", "2.2.2.2:22"]
worker_hosts = ["3.3.3.3:33", "4.4.4.4:44", "5.5.5.5:55"]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
1.1.3.2 建立任務
接下來啟動若干任務,使用者指令碼需要在每一個機器上都執行,一共執行 5 次(3 個 ps,2 個 worker)。每個任務之中,都需要使用同一個 tf.train.ClusterSpec 來了解叢集之中所有的任務。然後會啟動一個 tf.distribution.Server服務。
一個 tf.distribution.Server 例項封裝了一組裝置和一個 tf.compat.v1.Session 目標,可以參與分散式訓練。一個服務屬於一個叢集(由 tf.train.ClusterSpec 指定),並對應於一個指定作業中的特定任務。該服務可以與同一叢集中的任何其他服務通訊。
FLAGS = tf.app.flags.FLAGS
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
1.1.3.3 指定裝置
因為已經啟動了 Server,所以每個任務或者說節點的具體執行邏輯就不同了。程式碼之中根據指令碼執行的命令引數不同來決定這個Server執行的是哪個任務。
- 如果 FLAGS.job_name == "ps",程式就執行 join 操作,因為引數伺服器是引數更新的服務,只需要等待其他 worker 節點提交更新的引數即可。
- 如果 FLAGS.job_name == "worker",就執行後續的計算任務。TensorFlow 中計算/引數都可以分離,可以在裝置上分配計算節點,也可以在每個裝置上分配引數。在分散式環境下,依然會使用tf.device()函式將節點/操作放在當前任務下。tf.train.replica_device_setter 函式會依據 job 名,自動將計算分配到 worker 上。
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
1.2 分散式角度
1.2.1 概念
我們接下來從分散式業務邏輯/架構角度來具體分析一下。大家知道,Master-Worker 架構是分散式系統之中非常常見的一種架構組織形式,比如:GFS 之中有 Master,ChunkServer,Spanner 有 Zonemaster 和 Spanserver,Spark 有 driver和executor,Flink 有 JobManager 和 TaskManager。此架構下,Master 通常維護叢集元資訊,排程任務,Workers 則負責具體計算或者維護具體資料分片。
其實,TensorFlow 分散式也是採用了 Master-Worker 架構,為了更好的說明,我們給出一個官方的分散式 TensorFlow 的架構圖,圖上三個角色都是從邏輯視角來看。
- Client:前面的各種概念術語都是為了搭建一個分散式環境,Client 利用這個分散式環境進行計算。一個 client通常是一段構造 TensorFlow 計算圖的程式,通常情況下,客戶端通過迴圈呼叫 RPC 來讓 master 進行迭代計算(例如訓練)。
- Master:收到執行計算圖的命令之後,Master 負責協調排程,比如對計算圖進行剪枝,優化, 把計算圖拆分成多個子圖,每個子圖分配註冊給不同的 worker,觸發各個 worker 併發執行子圖。
- Worker:負責具體計算其收到的子圖。當接收到註冊子圖訊息之後,Worker 會將計運算元圖依據本地計算裝置進行二次切分,並把二次切分之後的子圖分配到各個裝置上,然後啟動計算裝置併發執行子圖。Worker 之間可能通過程序間通訊完成資料交換。圖中有兩個 worker,下方的 worker 的具體 Job 角色是引數伺服器,負責維護引數/更新引數等等,上面的 worker 會把梯度發給引數伺服器進行引數更新。
1.2.2 示意圖
圖上的叢集包括三個節點,每個節點上都執行一個 TensorFlow Server。這裡 Master,Worker 每一個都是 TensorFlow Server。
圖 3 叢集,來自 TensorFlow
1.3 系統角度
1.3.1 概念
我們接下來從具體軟體實現角度來剖析,在具體實現上可以分解為如下概念:
-
TensorFlow Server :Server 是執行 tf.train.Server 例項的程序,是一個叢集中的一員,Server 通常包括 Master Service 與一個 Worker Service。Server 可以和叢集中的其他 Server 進行通訊。
-
Master Service :一個 GRPC service,用於同一系列遠端的分散式裝置進行互動,用來協調排程多個 worker service。
- Master Service 對應了 "//tensorflow/core/protobuf/master_service.proto",其內部有 CreateSession,RunStep 等介面,所有的 TensorFlow Server 都實現了 Master Service。
- 客戶端可以與 Master Service 互動以執行分散式 TensorFlow 計算。客戶端一般通過 RPC 形式與一個 Master 之間保持互動式計算,客戶端建立一個客戶端會話,連線到某一個 master,該 master 建立一個 master session。
- 一個 Master Service 會包含多個 "主會話(master sessions)"並且維護其狀態。每個會話封裝了一個計算圖及其相關的狀態,這些 master session 通常對應於同一個 "客戶會話(client session)"(例如一個 tensorflow::Session例項)。
-
Master Session:一個主會話(master session)負責以下工作。
- 起到橋樑的作用,建立 client 與後端執行時的通道,比如可以將 Protobuf 格式的 GraphDef 傳送至分散式 Master。
- 使用佈局(placement)演算法將每個節點分配到一個裝置(本地或遠端)。放置演算法可能會根據從系統中的 worker 收集到的統計資料(例如,記憶體使用、頻寬消耗等)做出決定。
- 為了支援跨裝置和跨程序的資料流和資源管理,session 會在計算圖之中插入中間節點和邊。
- 向 worker 發出命令,讓其執行與本 worker 相關的子圖。
-
Worker Session: worker 通過 Worker Session 來標識一個執行序列(註冊計算圖,執行命令),Worker Session 屬於一個 Master Session。
-
Worker service:這是一個 GRPC service,代表 MasterService 在一組本地裝置上執行資料流計算圖。一個 worker service 會保持/跟蹤客戶計算圖的多個子圖,這些子圖對應了應該在這個 worker 上執行的節點,也包括那些程序間通訊所需的任何額外節點。Worker service 對應 worker_service.proto。所有的 TensorFlow server 也都實現了 worker service。
1.3.2 示意圖
我們現在知道,每個 Server 之上都會執行 MasterService 和 WorkerService 兩個服務,這意味著 server 可能同時扮演 Master 和 Worker 兩個角色,比如回到上圖,圖上的叢集包括三個節點,每個節點上都執行一個 TensorFlow Server。這裡 Master,Worker 每一個都是 TensorFlow Server,每個 server 之上都有兩種 service(MasterService 和 WorkerService),只不過在這個系統之中,目前實際有角色意義的分別是 MasterService(Master之上的) 和 WorkerService(兩個 worker 之上的),圖之中用下劃線表示。
圖 4 服務
我們接著看一些其他可能。
- 如果 Client 接入到了叢集之中的一個 Server A,則此 Server A 就扮演了 Master 角色,叢集其他 Server 則就是 Worker,但是 Server A 同時也可以扮演 Worker 角色。
- Client 可以和 Master 位於同一個程序之內,此時 Client 和 Master 可以直接使用函式呼叫來互動,避免 RPC 開銷。
- Master 可以和 Worker 位於同一個程序之內,此時 兩者可以直接使用函式呼叫來互動,避免 RPC 開銷。
- 可以有多個 Client 同時接入到一個叢集,比如下圖,此時叢集之中有兩個 Server 都可以扮演 Master/Worker 角色,兩個 Server 扮演 Worker 角色:
圖 5 多個Client 接入
1.4 圖操作角度
分散式執行的核心也是如何操作計算圖,但是計算功能被拆分為 Client,Master 和 Worker 三個角色。Client 負責構造計算圖,Worker 負責執行具體計算,但是 Worker 如何知道應該計算什麼?TensorFlow 在兩者之間插入了一個 Master 角色來負責協調,排程。
在分散式模式下,對於計算圖會進行分裂,執行操作。
- 從分裂角度看,TF 對於計算圖執行了二級分裂操作:
- MasterSession 生成 ClientGraph,然後通過 SplitByWorker 完成了一級分裂,得到多個 PartitionGraph,再把 PartitionGraph 列表註冊到 Worker 們之上。
- WorkerSession 通過 SplitByDevice 把自己得到的計算圖進行二級分裂,把分裂之後的 PartitionGraph 分配給每個裝置。
- 從執行角度來看,計算圖的具體執行只發生在 Worker 之上。
- Master 啟動各個 Worker 併發執行 PartitionGraph 列表。
- Worker 在每個裝置上啟動 Executor,執行 PartitionGraph。
因為執行是按照切分來的,所以我們這裡只演示切分如下:
圖 6 切分計算圖
1.5 通訊角度
最後,我們從通訊角度來對分散式模式進行分析。TF 的訊息傳輸的通訊元件叫做 Rendezvous,這是一個從生產者向消費者傳遞張量的抽象,一個 rendezvous 是一個通道(channels)的表(table)。生產者呼叫 Send() 方法,在一個命名的通道上傳送一個張量。消費者呼叫 Recv() 方法,從一個指定的通道接收一個張量。
在分散式模式之中,對跨裝置的邊會進行分裂,在邊的傳送端和接收端會分別插入 Send 節點和 Recv 節點。
- 程序內的 Send 和 Recv 節點通過 IntraProcessRendezvous 實現資料交換。
- 程序間的 Send 和 Recv 節點通過 GrpcRemoteRendezvous 實現資料交換。
比如下圖,左面是原始計算圖,右面是分裂之後的計算圖,5 個節點被分配到兩個 worker 之上。
圖 7 分裂計算圖
我們假設 Worker 0 有兩個 GPU,當插入Send 節點和 Recv 節點,效果如下,其中 Worker 1 傳送給 Worker 之間的代表程序間通過 GrpcRemoteRendezvous 實現資料交換,Worker 0 內部兩個 GPU 之間的虛線箭頭代表程序內部通過 IntraProcessRendezvous 實現資料交換。
圖 8 通訊角度
我們接下來就看看 Server 的總體概況。
2. Server
2.1 介面
Server 的介面位於 tensorflow/core/protobuf/tensorflow_server.proto,具體如下:
// Defines the configuration of a single TensorFlow server.
message ServerDef {
// The cluster of which this server is a member.
ClusterDef cluster = 1;
// The name of the job of which this server is a member.
//
// NOTE(mrry): The cluster field must contain a JobDef with a name field
// that matches this name.
string job_name = 2;
// The task index of this server in its job.
//
// NOTE: The cluster field must contain a JobDef with a matching name
// and a mapping in its tasks field for this index.
int32 task_index = 3;
// The default configuration for sessions that run on this server.
ConfigProto default_session_config = 4;
// The protocol to be used by this server.
//
// Acceptable values include: "grpc", "grpc+verbs".
string protocol = 5;
// The server port. If not set, then we identify the port from the job_name.
int32 port = 6;
// Device filters for remote tasks in the cluster.
// NOTE: This is an experimental feature and only effective in TensorFlow 2.x.
ClusterDeviceFilters cluster_device_filters = 7;
}
2.2 Python 定義
可以從多個角度來看Server。
- 首先,Server 是一個叢集中的一員,負責管理其本地裝置集。
- 其次,Server 是基於 gRPC 的伺服器,Server 可以和叢集中的其他 Server 進行通訊。
- 第三,Server是執行 tf.train.Server 例項的程序,tf.train.Server 內部通常包括 Master Service與一個Worker Service,這兩個對外的介面就是 Master 和 Worker 這兩種"服務"。Server 同時可以扮演這兩種角色。
- 第四,Server 的實現是 GrpcServer。
- GrpcServer 內部有一個成員變數 grpc::Server server_ ,這是 GPRC 通訊 server,server_ 會監聽訊息,並且把命令傳送到內部兩個服務 MasterService 和 WorkerService 之中對應的那個。該服務會通過回撥函式進行業務處理。
- 當其是 Master 角色時候,對外服務是 MasterService,MasterService 為每一個接入的 Client 啟動一個 MasterSession,MasterSession 被一個全域性唯一的 session_handle 表示,此 session_handle 會傳遞給 Client。Master 可以為多個 Client 服務,一個 Client 只能和一個 Master 打交道。
- 當其是 Worker 角色時候,可以為多個 Master 提供服務,其對外服務是 WorkerService,WorkerService 為每個接入的 MasterSession 生成一個 WorkerSession 例項,MasterSession 可以讓 WorkerSession 註冊計算圖,執行命令。
圖 9 GrpcServer 結構
具體Python介面定義在 tensorflow/python/training/server_lib.py 之中。
@tf_export("distribute.Server", v1=["distribute.Server", "train.Server"])
@deprecation.deprecated_endpoints("train.Server")
class Server(object):
"""An in-process TensorFlow server, for use in distributed training.
A tf.distribute.Server instance encapsulates a set of devices and a
tf.compat.v1.Session target that
can participate in distributed training. A server belongs to a
cluster (specified by a tf.train.ClusterSpec), and
corresponds to a particular task in a named job. The server can
communicate with any other server in the same cluster.
"""
def __init__(self,
server_or_cluster_def,
job_name=None,
task_index=None,
protocol=None,
config=None,
start=True):
"""Creates a new server with the given definition.
The job_name, task_index, and protocol arguments are optional, and
override any information provided in server_or_cluster_def.
Args:
server_or_cluster_def: A tf.train.ServerDef or tf.train.ClusterDef
protocol buffer, or a tf.train.ClusterSpec object, describing the
server to be created and/or the cluster of which it is a member.
job_name: (Optional.) Specifies the name of the job of which the server is
a member. Defaults to the value in server_or_cluster_def, if
specified.
task_index: (Optional.) Specifies the task index of the server in its job.
Defaults to the value in server_or_cluster_def, if specified.
Otherwise defaults to 0 if the server's job has only one task.
protocol: (Optional.) Specifies the protocol to be used by the server.
Acceptable values include "grpc", "grpc+verbs". Defaults to the value
in server_or_cluster_def, if specified. Otherwise defaults to
"grpc".
config: (Options.) A tf.compat.v1.ConfigProto that specifies default
configuration options for all sessions that run on this server.
start: (Optional.) Boolean, indicating whether to start the server after
creating it. Defaults to True.
Raises:
tf.errors.OpError: Or one of its subclasses if an error occurs while
creating the TensorFlow server.
"""
self._server_def = _make_server_def(server_or_cluster_def, job_name,
task_index, protocol, config)
self._server = c_api.TF_NewServer(self._server_def.SerializeToString())
if start:
self.start()
TF_NewServer 方法就進入到了C++世界,其呼叫 tensorflow::NewServer 建立了C++ 世界的Server。
TF_Server* TF_NewServer(const void* proto, size_t proto_len,
TF_Status* status) {
#if defined(IS_MOBILE_PLATFORM) || defined(IS_SLIM_BUILD)
status->status = tensorflow::errors::Unimplemented(
"Server functionality is not supported on mobile");
return nullptr;
#else
tensorflow::ServerDef server_def;
if (!server_def.ParseFromArray(proto, static_cast<int>(proto_len))) {
status->status = InvalidArgument(
"Could not parse provided bytes into a ServerDef protocol buffer");
return nullptr;
}
std::unique_ptr<tensorflow::ServerInterface> out_server;
status->status = tensorflow::NewServer(server_def, &out_server);
if (!status->status.ok()) return nullptr;
return new TF_Server(std::move(out_server));
#endif // defined(IS_MOBILE_PLATFORM) || defined(IS_SLIM_BUILD)
}
然後會通過如下程式碼選擇建立何種Server。
// Creates a server based on the given server_def, and stores it in
// *out_server. Returns OK on success, otherwise returns an error.
Status NewServer(const ServerDef& server_def,
std::unique_ptr<ServerInterface>* out_server) {
ServerFactory* factory;
TF_RETURN_IF_ERROR(ServerFactory::GetFactory(server_def, &factory));
return factory->NewServer(server_def, ServerFactory::Options(), out_server);
}
而 GrpcServer 則早就註冊到系統之中,GrpcServerFactory 是工廠類,如果 protocol 是"grpc",則生成 GrpcServer。
class GrpcServerFactory : public ServerFactory {
public:
bool AcceptsOptions(const ServerDef& server_def) override {
return server_def.protocol() == "grpc";
}
Status NewServer(const ServerDef& server_def, const Options& options,
std::unique_ptr<ServerInterface>* out_server) override {
return GrpcServer::Create(server_def, Env::Default(),
options.local_device_mgr, out_server);
}
};
因此,我們接下來就看看GrpcServer。
2.3 ServerInterface
ServerInterface 是基礎介面,其代表一個輸出Master和Worker服務的 TensorFlow Sever。定義在tensorflow/core/distributed_runtime/server_lib.h 之中。 這個庫會基於註冊/工廠的機制來建立 TensorFlow 伺服器物件。每個伺服器的實現都必須有一個配套的 ServerFactory,並建立一個靜態的 "registrar"物件,用工廠類的一個例項呼叫 ServerFactory::Register()。具體如下:
class ServerInterface {
public:
ServerInterface() {}
virtual ~ServerInterface() {}
// Starts the server running asynchronously. Returns OK on success, otherwise
// returns an error.
virtual Status Start() = 0;
// Stops the server asynchronously. Returns OK on success, otherwise returns
// an error.
//
// After calling Stop(), the caller may call Join() to block until the
// server has stopped.
virtual Status Stop() = 0;
// Blocks until the server has stopped. Returns OK on success, otherwise
// returns an error.
virtual Status Join() = 0;
// Returns a target string that can be used to connect to this server using
// tensorflow::NewSession().
virtual const string target() const = 0;
virtual WorkerEnv* worker_env() = 0;
virtual MasterEnv* master_env() = 0;
// Update the set of workers that can be reached by the server
virtual Status UpdateServerDef(const ServerDef& server_def) = 0;
// Functions to operate on service-specific properties.
//
// Add master eager context to local eager service in order to handle enqueue
// requests from remote workers.
virtual Status AddMasterEagerContextToEagerService(
const tensorflow::uint64 context_id, EagerContext* context) = 0;
// Set coordination service agent instance to coordination service RPC handler
virtual Status SetCoordinationServiceAgentInstance(
CoordinationServiceAgent* agent) = 0;
private:
TF_DISALLOW_COPY_AND_ASSIGN(ServerInterface);
};
工廠類定義如下:
class ServerFactory {
public:
struct Options {
// Local DeviceMgr to use.
tensorflow::DeviceMgr* local_device_mgr;
};
// Creates a new server based on the given server_def, and stores
// it in *out_server. Returns OK on success, otherwise returns an
// error.
virtual Status NewServer(const ServerDef& server_def, const Options& options,
std::unique_ptr<ServerInterface>* out_server) = 0;
// Returns true if and only if this factory can create a server
// based on the given server_def.
virtual bool AcceptsOptions(const ServerDef& server_def) = 0;
virtual ~ServerFactory() {}
// For each ServerFactory subclass, an instance of that class must
// be registered by calling this method.
//
// The server_type must be unique to the server factory.
static void Register(const string& server_type, ServerFactory* factory);
// Looks up a factory that can create a server based on the given
// server_def, and stores it in *out_factory. Returns OK on
// success, otherwise returns an error.
static Status GetFactory(const ServerDef& server_def,
ServerFactory** out_factory);
};
2.4 GrpcServer
2.4.1 定義
GrpcServer 是管理當前程序中的 Master 和 Worker 服務的結構,通過 Start()、Stop()、Join() 構成了下面註釋之中的狀態機,
- New 狀態上啟動了 grpc::Server,但是沒有對外提供服務。
- Started 狀態上啟動 MasterService 和 WorkerService 兩個對外的 RPC 服務。
- Stopped 狀態下停止 MasterService 和 WorkerService 兩個服務。
// Represents the current state of the server, which changes as follows:
//
// Join() Join()
// ___ ___
// Start() \ / Stop() \ /
// NEW ---------> STARTED --------> STOPPED
// \ /
// \________________________/
// Stop(), Join()
其主要成員變數是:
- MasterEnv master_env_ : 是 Master 工作所使用的環境,環境之中不擁有這些實際指標;
- worker_env_ : WorkerEnv 型別,是worker工作所使用的環境;
- master_impl_ :具體執行業務操作的 Master 類;
- worker_impl_ :具體執行業務操作的 GrpcWorker;
- master_service_ :GrpcMasterService 例項;
- worker_service_ : GrpcWorkerService 例項;
- master_thread_ : MasterService 用來 RPC polling 的執行緒;
- worker_thread_ : WorkerService 用來 RPC polling 的執行緒;
- std::unique_ptr<::grpc::Server> server_ :GPRC 通訊 server;
具體來說,就是啟動了若干個執行緒,分別執行了 GrpcMasterService,GrpcWorkerService,GrpcEagerServiceImpl。
class GrpcServer : public ServerInterface {
private:
Env* env_;
// The port to which this server is bound.
int bound_port_ = 0;
// The host name of this server
string host_name_;
// Guards server configuration, server, and state.
mutex mu_;
enum State { NEW, STARTED, STOPPED };
State state_ TF_GUARDED_BY(mu_);
// Implementation of a TensorFlow master, and RPC polling thread.
MasterEnv master_env_;
std::unique_ptr<Master> master_impl_;
AsyncServiceInterface* master_service_ = nullptr;
std::unique_ptr<Thread> master_thread_ TF_GUARDED_BY(mu_);
std::map<std::string, AsyncServiceInterface*> extra_services_;
std::vector<std::unique_ptr<Thread>> extra_service_threads_
TF_GUARDED_BY(mu_);
// Implementation of a TensorFlow worker, and RPC polling thread.
WorkerEnv worker_env_;
std::unique_ptr<const DeviceMgr> owned_device_manager_;
std::unique_ptr<GrpcWorker> worker_impl_;
AsyncServiceInterface* worker_service_ = nullptr;
std::unique_ptr<Thread> worker_thread_ TF_GUARDED_BY(mu_);
std::unique_ptr<GrpcWorkerEnv> grpc_worker_env_;
// TensorFlow Eager implementation, and RPC polling thread.
AsyncServiceInterface* eager_service_ = nullptr;
std::unique_ptr<Thread> eager_thread_ TF_GUARDED_BY(mu_);
std::shared_ptr<WorkerSession> worker_session_;
// TensorFlow profiler service implementation.
std::unique_ptr<grpc::ProfilerService::Service> profiler_service_ = nullptr;
// The overall server configuration.
ServerDef server_def_ TF_GUARDED_BY(mu_);
std::unique_ptr<::grpc::Server> server_ TF_GUARDED_BY(mu_);
};
2.4.2 初始化
初始化邏輯大致如下:
-
獲取各種相關配置,初始化 MasterEnv 和 WorkerEnv;
-
建立Device Manager;
-
構建device列表;
-
建立 RpcRendezvousMgr;
-
建立server必要設定;
-
建立 Master 以及對應的 GrpcMasterService,GrpcMasterService 是對外提供服務的實體,訊息到達時候會呼叫這裡的訊息處理函式。具體業務則由 Master 提供。
-
建立 GrpcWorker 以及對應的 GrpcWorkerService,GrpcWorkerService是對外提供服務的實體,訊息到達時候會呼叫這裡的訊息處理函式。具體業務則由 GrpcWorker 提供。
-
呼叫 builder.BuildAndStart 啟動GRPC 通訊伺服器 grpc::Server,當啟動之後,GrpcServer 依然是 New 狀態,沒有提供對外服務,需要狀態機轉換到 Started 狀態才會對外提供服務;
-
建立grpc 需要的environment;
-
建立 WorkerCache;
-
建立一個 SessionMgr,並隨後會在這個 SessionMgr 中建立 WorkerSession;
-
設定 MasterSession 的Factory,如果需要時候就會呼叫建立MasterSession,因為有的任務比如ps是不需要MasterSession的;
-
註冊 LocalMaster;
Status GrpcServer::Init(const GrpcServerOptions& opts) {
mutex_lock l(mu_);
master_env_.env = env_;
worker_env_.env = env_;
// Check parameters before DeviceFactory::AddDevices,
// otherwise if 'task_index=-1' the program will abort.
int requested_port;
TF_RETURN_IF_ERROR(GetHostAndPort(server_def_, &host_name_, &requested_port));
SessionOptions sess_opts;
ConfigProto config = server_def_.default_session_config();
sess_opts.config = config;
// Configure shared devices between master and worker.
string name_prefix =
strings::StrCat("/job:", server_def_.job_name(), "/replica:0",
"/task:", server_def_.task_index());
// 建立Device Manager
if (opts.local_device_mgr == nullptr) {
std::vector<std::unique_ptr<Device>> devices;
TF_RETURN_IF_ERROR(
DeviceFactory::AddDevices(sess_opts, name_prefix, &devices));
worker_env_.device_mgr = new DynamicDeviceMgr(std::move(devices));
owned_device_manager_.reset(worker_env_.device_mgr);
} else {
worker_env_.device_mgr = opts.local_device_mgr;
owned_device_manager_.reset(nullptr);
}
// 構建device列表
worker_env_.local_devices = worker_env_.device_mgr->ListDevices();
master_env_.local_devices = worker_env_.device_mgr->ListDevices();
// 建立了 RpcRendezvousMgr
worker_env_.rendezvous_mgr = opts.rendezvous_mgr_func == nullptr
? new RpcRendezvousMgr(&worker_env_)
: opts.rendezvous_mgr_func(&worker_env_);
string unused;
string default_worker_name;
if (!DeviceNameUtils::SplitDeviceName(master_env_.local_devices[0]->name(),
&default_worker_name, &unused)) {
return errors::Internal("Could not parse worker name.");
}
// 建立server必要設定
::grpc::ServerBuilder builder;
builder.AddListeningPort(strings::StrCat("0.0.0.0:", requested_port),
GetServerCredentials(server_def_), &bound_port_);
builder.SetMaxMessageSize(std::numeric_limits<int32>::max());
bool reuse_port = false;
const Status status =
ReadBoolFromEnvVar("TF_GRPC_REUSE_PORT", false, &reuse_port);
auto server_build_option =
reuse_port
? std::unique_ptr<::grpc::ServerBuilderOption>(new ReusePortOption)
: std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption);
builder.SetOption(std::move(server_build_option));
// Allow subclasses to specify more args to pass to the gRPC server.
// 建立 Master 以及對應的 GrpcMasterService
MaybeMutateBuilder(&builder, requested_port);
master_impl_ = CreateMaster(&master_env_);
master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);
// 建立 GrpcWorker 以及對應的 GrpcWorkerService
worker_impl_ = opts.worker_func ? opts.worker_func(&worker_env_, config)
: NewGrpcWorker(&worker_env_, config);
worker_service_ = NewGrpcWorkerService(worker_impl_.get(), &builder,
opts.worker_service_options)
.release();
eager_service_ = new eager::GrpcEagerServiceImpl(&worker_env_, &builder);
profiler_service_ = profiler::CreateProfilerService();
builder.RegisterService(profiler_service_.get());
// Add any extra services to be started.
extra_services_ = ExtraServices(&builder);
// extra service:
if (opts.service_func != nullptr) {
opts.service_func(&worker_env_, &builder);
}
// 啟動 GRPC 通訊 server
server_ = builder.BuildAndStart();
// Create the execution environment for the GRPC workers cache.
// 建立grpc 需要的environment
grpc_worker_env_.reset(CreateGrpcWorkerEnv());
// 建立 WorkerCache
WorkerCacheInterface* worker_cache;
WorkerCacheFactoryOptions worker_cache_factory_options(server_def_);
TF_RETURN_IF_ERROR(
WorkerCacheFactory(worker_cache_factory_options, &worker_cache));
CHECK_NE(nullptr, worker_cache);
if (opts.collective_mgr_func) {
worker_env_.collective_executor_mgr.reset(
opts.collective_mgr_func(config, &worker_env_, worker_cache));
} else {
worker_env_.collective_executor_mgr = CreateProdRpcCollectiveExecutorMgr(
config, worker_env_.device_mgr, MaybeCreateNcclCommunicator(),
worker_cache, default_worker_name);
}
// Set up worker environment.
// 建立一個 SessionMgr,並隨後會在這個 SessionMgr 中建立 WorkerSession
worker_env_.session_mgr = new SessionMgr(
&worker_env_, SessionMgr::WorkerNameFromServerDef(server_def_),
std::unique_ptr<WorkerCacheInterface>(worker_cache),
[this](const ServerDef& server_def, WorkerCacheInterface** worker_cache) {
WorkerCacheFactoryOptions options(server_def);
return WorkerCacheFactory(options, worker_cache);
});
worker_env_.compute_pool = ComputePool(sess_opts);
// Finish setting up master environment.
master_env_.ops = OpRegistry::Global();
master_env_.worker_cache = worker_cache;
master_env_.collective_executor_mgr =
worker_env_.collective_executor_mgr.get();
StatsPublisherFactory stats_factory = opts.stats_factory;
// 設定 MasterSession 的Factory,如果需要時候就會呼叫建立MasterSession,因為有的任務比如ps是不需要MasterSession的
master_env_.master_session_factory =
[config, stats_factory](
SessionOptions options, const MasterEnv* env,
std::unique_ptr<std::vector<std::unique_ptr<Device>>> remote_devs,
std::unique_ptr<WorkerCacheInterface> worker_cache,
std::unique_ptr<DeviceSet> device_set,
std::vector<string> filtered_worker_list) {
options.config.MergeFrom(config);
return new MasterSession(options, env, std::move(remote_devs),
std::move(worker_cache), std::move(device_set),
std::move(filtered_worker_list),
stats_factory);
};
master_env_.worker_cache_factory =
[this](const WorkerCacheFactoryOptions& options,
WorkerCacheInterface** worker_cache) {
return WorkerCacheFactory(options, worker_cache);
};
// Provide direct access to the master from in-process clients.
// 註冊 LocalMaster
LocalMaster::Register(target(), master_impl_.get(),
config.operation_timeout_in_ms());
return Status::OK();
}
Master
Master 是具體提供業務的物件。上面程式碼之中,生成master的相關語句如下
master_impl_ = CreateMaster(&master_env_);
LocalMaster::Register(target(), master_impl_.get(),
config.operation_timeout_in_ms());
由以下程式碼可知,GrpcServer 生成的是 Master。
std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) {
return std::unique_ptr<Master>(new Master(master_env, 0.0));
}
由以下程式碼可知,Master在此時對應的target是"grpc://"。
const string GrpcServer::target() const {
return strings::StrCat("grpc://", host_name_, ":", bound_port_);
}
LocalMaster 會把Master註冊到自己內部。
// Provide direct access to the master from in-process clients.
LocalMaster::Register(target(), master_impl_.get(),
config.operation_timeout_in_ms());
Worker
初始化程式碼之中,如下程式碼建立了worker,預設就是呼叫了 NewGrpcWorker 建立 GrpcWorker(具體提供業務的物件)。
worker_impl_ = opts.worker_func ? opts.worker_func(&worker_env_, config)
: NewGrpcWorker(&worker_env_, config);
2.4.3 Env
WorkerEnv
WorkerEnv 把各種相關配置歸總在一起,供 Worker 使用,可以認為是 Worker 執行上下文,WorkerEnv 與 Server 具有同樣生命週期,在 Worker 執行時全程可見,其主要變數如下:
-
Env* env :跨平臺 API 介面
-
SessionMgr* session_mgr :管理 WorkerSession 集合。
-
std::vector<Device*> local_devices :本地裝置集。
-
DeviceMgr* device_mgr :管理本地裝置集和遠端裝置集。
-
RendezvousMgrInterface* rendezvous_mgr :管理 Rendezvous 例項集。
-
std::unique_ptr
collective_executor_mgr; -
thread::ThreadPool* compute_pool :執行緒池,每次有運算元執行,都從中獲取一個執行緒。
// The worker environment class, which holds a bag of pointers to
// per-worker singletons.
//
// WorkerEnv does not own its member pointers.
struct WorkerEnv {
Env* env = nullptr;
// session_mgr encapsulates state for each session.
SessionMgr* session_mgr = nullptr;
// The local devices of this worker. Devices are owned by the device_mgr.
//
// REQUIRES: !local_devices.empty().
std::vector<Device*> local_devices;
// device_mgr manages local devices (cpu and gpu). The WorkerService
// is the network interface for managed devices.
//
// Note: Please use the device_mgr associated with your session if appropriate
// instead of this one. Using this device_mgr does not support ClusterSpec
// propagated sessions.
DeviceMgr* device_mgr = nullptr;
// A set of rendezvous keyed by step ids.
RendezvousMgrInterface* rendezvous_mgr = nullptr;
// Generates per-step CollectiveExecutors and has access to utilities
// supporting collective operations.
std::unique_ptr<CollectiveExecutorMgrInterface> collective_executor_mgr;
// A pool of threads for scheduling compute work.
thread::ThreadPool* compute_pool = nullptr;
// Coordination service.
CoordinationServiceInterface* coord_service;
};
WorkerEnv 的幾個 管理類成員變數都很重要,比如 SessionMgr 類,其為 Worker 管理會話,比如會話的產生和銷燬,同時還維護了當前 Worker 的會話控制代碼到會話的對映。
class SessionMgr {
public:
Status CreateSession(...);
Status DeleteSession(...);
private:
const WorkerEnv* const worker_env_;
const WorkerCacheFactory worker_cache_factory_;
std::map<string, std::unique_ptr<WorkerSession>> sessions_ GUARDED_BY(mu_);
};
MasterEnv
MasterEnv 把各種相關配置歸總在一起,供 master 使用,可以認為是 Master 執行時的上下文,在 Master 的整個生命週期都是可見的。其主要成員變數如下:
- Env* env :跨平臺 API 介面。
- vector<Device*> local_devices :本地裝置集;
- WorkerCacheFactory worker_cache_factory :工廠類,可以建立 WorkerCacheInterface 例項;
- MasterSessionFactory master_session_factory :工廠類,可以建立 MasterSession 例項;
- WorkerCacheInterface :建立 MasterInterface 例項, MasterInterface 用於呼叫遠端 MasterService 服務;
- OpRegistryInterface* ops :查詢特定 OP 的元資料;
- CollectiveExecutorMgrInterface* collective_executor_mgr :訪問集合操作。
// The master environment class, which holds a bag of pointers to
// per-master state.
//
// MasterEnv does not own its member pointers.
struct MasterEnv {
Env* env = nullptr;
// Object from which WorkerInterface instances can be obtained. Not owned.
WorkerCacheInterface* worker_cache = nullptr;
// The operation definitions to use. Must be filled before use.
const OpRegistryInterface* ops = nullptr;
// Local devices co-located with this master. Devices are not owned
// by the master service.
//
// REQUIRES: !local_devices.empty().
std::vector<Device*> local_devices;
// Factory for creating master sessions, given session options and a
// vector of devices.
//
// The caller of the function takes ownership of the returned
// MasterSession, which may not be null. Ownership of the
// MasterEnv* is retained by the caller.
std::function<MasterSession*(
SessionOptions, MasterEnv*,
std::unique_ptr<std::vector<std::unique_ptr<Device>>>,
std::unique_ptr<WorkerCacheInterface>,
std::unique_ptr<DeviceSet> device_set,
std::vector<string> filtered_worker_list)>
master_session_factory;
std::function<Status(const WorkerCacheFactoryOptions&,
WorkerCacheInterface**)>
worker_cache_factory;
// Generates per-step CollectiveExecutors and has access to utilities
// supporting collective operations. Not owned.
CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr;
};
2.5 啟動
Python 程式碼之中,最後是 start 方法的呼叫。
@tf_export("distribute.Server", v1=["distribute.Server", "train.Server"])
@deprecation.deprecated_endpoints("train.Server")
class Server(object):
def __init__(self,
server_or_cluster_def,
job_name=None,
task_index=None,
protocol=None,
config=None,
start=True):
self._server_def = _make_server_def(server_or_cluster_def, job_name,
task_index, protocol, config)
self._server = c_api.TF_NewServer(self._server_def.SerializeToString())
if start:
self.start()
在呼叫之前,Server 是 New 狀態,呼叫 start 之後,GrpcServer 的狀態從 New 遷移 Started 狀態。Start() 方法之中,會啟動三個獨立執行緒,分別是 MasterService,WorkerService,EagerService 的訊息處理器。至此,GrpcServer 才對外提供 MasterService 和 WorkerService 這兩種服務。
Status GrpcServer::Start() {
mutex_lock l(mu_);
switch (state_) {
case NEW: {
master_thread_.reset(
env_->StartThread(ThreadOptions(), "TF_master_service",
[this] { master_service_->HandleRPCsLoop(); }));
worker_thread_.reset(
env_->StartThread(ThreadOptions(), "TF_worker_service",
[this] { worker_service_->HandleRPCsLoop(); }));
eager_thread_.reset(
env_->StartThread(ThreadOptions(), "TF_eager_service",
[this] { eager_service_->HandleRPCsLoop(); }));
for (const auto& kv : extra_services_) {
const std::string& service_name = kv.first;
AsyncServiceInterface* service = kv.second;
std::unique_ptr<Thread> extra_service_thread;
extra_service_thread.reset(env_->StartThread(
ThreadOptions(), service_name,
[service = service] { service->HandleRPCsLoop(); }));
extra_service_threads_.push_back(std::move(extra_service_thread));
}
state_ = STARTED;
return Status::OK();
}
case STARTED:
return Status::OK();
case STOPPED:
return errors::FailedPrecondition("Server has stopped.");
default:
LOG(FATAL);
}
}
2.6 等待終止服務
啟動之後,需要讓這幾個執行緒做 Join 操作,因此主執行緒會掛起直至這兩個執行緒終止,這樣可以持久地對外提供 MasterService 服務和 WorkerService 服務。
Status GrpcServer::Join() {
mutex_lock l(mu_);
switch (state_) {
case NEW:
// Prevent the server from being started subsequently.
state_ = STOPPED;
return Status::OK();
case STARTED:
case STOPPED:
master_thread_.reset();
worker_thread_.reset();
eager_thread_.reset();
for (auto& thread : extra_service_threads_) {
thread.reset();
}
return Status::OK();
default:
LOG(FATAL);
}
}
至此,TF 分散式環境總體介紹完畢。
0xFF 參考
https://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/
什麼是in-graph replication和between-graph replication?
[騰訊機智] TensorFlow原始碼解析(1): 建立會話
TensorFlow 分散式(Distributed TensorFlow)
tensorflow原始碼解析之distributed_runtime
Distributed TensorFlow: A Gentle Introduction
TensorFlow中的Placement啟發式演算法模組——Placer
TensorFlow的圖切割模組——Graph Partitioner