[原始碼解析] 深度學習分散式訓練框架 horovod (5) --- 融合框架
[原始碼解析] 深度學習分散式訓練框架 horovod (5) --- 融合框架
目錄- [原始碼解析] 深度學習分散式訓練框架 horovod (5) --- 融合框架
0x00 摘要
Horovod 是Uber於2017年釋出的一個易於使用的高效能的分散式訓練框架,在業界得到了廣泛應用。
本系列將通過原始碼分析來帶領大家瞭解 Horovod。本文是系列第五篇,看看 Horovod 如何融合各個機器學習框架。
前面幾篇連結如下:
[原始碼解析] 深度學習分散式訓練框架 Horovod (1) --- 基礎知識
[原始碼解析] 深度學習分散式訓練框架 horovod (2) --- 從使用者角度切入
[原始碼解析] 深度學習分散式訓練框架 horovod (3) --- Horovodrun背後做了什麼
[原始碼解析] 深度學習分散式訓練框架 horovod (4) --- 網路基礎 & Driver
我們需要一些問題來引導分析:
- Horovod 不依託於某個框架,自己通過MPI建立了一套分散式系統,完成了allreduce, allgather等collective operations通訊工作,但是如何實現一個大統一的分散式通訊框架?
- Horovod是一個庫,怎麼嵌入到各種深度學習框架之中?比如怎麼嵌入到Tensorflow,PyTorch,MXNet,Keras?
- Horovod 因為需要相容這麼多學習框架,所以應該有自己的 OP 操作,在此基礎上新增適配層,這樣就可以達到相容目的;
- 如何將梯度的同步通訊完全抽象為框架無關的架構?
- 如何將通訊和計算框架分離,這樣,計算框架只需要直接呼叫hvd介面,如HorovodAllreduceOp來進行梯度求平均即可。
我們接下來看看 Horovod 如何融合。
0x01 架構圖
我們通過架構圖來看看。
以下是網上一位同學的架構圖帶你瞭解當紅炸子雞Horovod分散式訓練框架,為了盡力保持風格統一,我重新繪製如下:
他分層思路如下:
- 統一層:用來整合各個框架層,hvd將通訊和計算框架分離之後,計算框架只需要直接呼叫hvd介面,如HorovodAllreduceOp 來進行梯度求平均即可。
- 框架層:支援Tensorflow,PyTorch,MXNet,Keras四個熱門的深度學習框架,對眾多熱門框架的訓練支援是Horovod的優勢之一。
- 多卡通訊層(集合通訊層):主要是整合一些通訊框架,包括:NCCL, MPI, GLOO, CCL,主要就是完成前面說到的AllReduce的過程。
- 網路通訊層:主要是優化網路通訊,提高叢集間的通訊效率。
MPI在Hovorod的角色比較特殊:
- 一方面Horovod內集成了基於MPI的AllReduce,類似於NCCL,都是用作梯度規約;
- 另一方面,MPI可以用來啟動多個程序(Hovorod裡用Rank表示),實現平行計算;
0x02 統一層
我們現在知道,Horovod 內部實現(封裝)了 allreduce 功能,藉以實現梯度規約。
但是,hvd.allreduce又是如何實現對不同通訊library的呼叫的呢?Horovod 使用一個統一層來完成。
首先,我們看看每個 rank 節點的執行機制,這樣知道統一層的實現需要考慮哪些因素:
- 每個rank有兩個thread:Execution thread 和 Background thread 。
- Execution thread 是用來做機器學習計算的。
- Background thread 是通訊和做allreduce的。
- 後臺執行緒中 有一個訊息佇列接收AllReduce,AllGather以及Broadcast等op的請求;
- 後臺執行緒會每隔一段時間輪詢訊息佇列,拿到一批op之後,會對op中的tensor進行融合,再進行相應的操作。
- 如果tensor在視訊記憶體中,那麼它會使用NCCL庫執行。而如果是在記憶體中,則會使用MPI或者Gloo執行。
其次,統一層的實現是:
- 構建一個Operation 類體系,首先定義基類HVD OP,然後在此基礎上定義子類AllReduceOP,並以此延伸出多個基於不同通訊library的collective OP(就是適配層),比如說 GlooAllreduce 和 MPIAllReduce。
- 構建一個訊息佇列。所有的適配層 最後都是發出一些 Op + Tensor 的 Message 到佇列中,後臺初始化的時候會構建一個專門的執行緒(Background thread)專門消費這個佇列。因此有一個同步訊息的過程,相當於"某個 tensor"在所有節點上都就緒以後就可以開始計算了。
- Horovod 定義的這套HVD OP是跟具體深度學習框架無關的,Horovod 針對各個框架定義了不同的HVD OP實現。比如使用 TensorFlow時候,是無法直接插到TF Graph中執行的,所以還需要註冊TF的HVD OP。
我們下面就逐一分析下這幾個方面。
0x03 Horovod OP 類體系
Horovod OP 類體系如下:
- 首先定義基類HVD OP;
- 然後在次基礎上定義子類AllReduceOP;
- 並以此延伸出多個基於不同通訊library的collective OP,比如說 GlooAllreduce 和 MPIAllReduce;
邏輯如下:
+---------------+
| HorovodOp |
+----+-----+---++
^ ^ ^ ^ ^
| | | | |
+----------------------------+ | | | |
| +---------------------+ | | +-----------------+
| | +-------+ | |
| | | | |
+------+-----+ +---+----+ +---------+---+ +----+--------+ +----------+--+
| AlltoallOp | | JoinOp | | AllreduceOp | | AllgatherOp | | BroadcastOp |
+------------+ +--------+ ++---+----+--++ +-------------+ +-------------+
^ ^ ^ ^
| | | |
+------------------+ | | +-----------------------------------+
| +-------+ +-------------+ |
| | | |
+-----+--------+ +---+----------+ +------------+--------+ +------------+---+
| MPIAllreduce | | GPUAllreduce | | AdasumMPIAllreduceOp| | GlooAllreduce |
+--------------+ +--------------+ +---------------------+ +----------------+
手機上如圖:
3.1 基類 HorovodOp
HorovodOp 是所有類的基類,其主要作用是:
- 擁有 HorovodGlobalState,這樣可以隨時呼叫到總體state;
- NumElements 函式負責獲取本 OP 擁有多少 tensor;
- 一個虛擬函式 Execute 用以被派生類實現,就是具體派生類需要實現的演算法操作;
class HorovodOp {
public:
HorovodOp::HorovodOp(HorovodGlobalState* global_state)
: global_state_(global_state) {}
int64_t HorovodOp::NumElements(std::vector<TensorTableEntry>& entries) {
int64_t num_elements = 0;
for (auto& e : entries) {
num_elements += e.tensor->shape().num_elements();
}
return num_elements;
}
virtual Status Execute(std::vector<TensorTableEntry>& entries,
const Response& response) = 0;
protected:
HorovodGlobalState* global_state_;
};
3.2 派生類 AllreduceOp
HorovodOp 的派生類有幾個,其功能望文生義,比如:AllreduceOp ,AllgatherOp,BroadcastOp,AlltoallOp,JoinOp(彈性訓練使用)。
我們以 AllreduceOp 為例,其定義如下,主要函式是:
- Execute 需要其派生類實現,就是具體進行演算法操作;
- Enabled 需要其派生類實現;
- MemcpyInFusionBuffer :用來拷貝 input Fusion tensor 多個entries;
- MemcpyOutFusionBuffer :用來拷貝 output Fusion tensor 多個entries;
- MemcpyEntryInFusionBuffer :用來拷貝 input Fusion tensor;
- MemcpyEntryOutFusionBuffer :用來拷貝 output Fusion tensor;
class AllreduceOp : public HorovodOp {
public:
virtual Status Execute(std::vector<TensorTableEntry>& entries,
const Response& response) = 0;
virtual bool Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const = 0;
protected:
virtual void
MemcpyInFusionBuffer(const std::vector<TensorTableEntry>& entries,
const void*& fused_input_data, void*& buffer_data,
size_t& buffer_len);
......
};
3.3 適配類 MPIAllreduce
接下來是具體的實現類,和具體通訊框架有關,比如:MPIAllreduce,GPUAllreduce,AdasumMPIAllreduceOp,GlooAllreduce。在 common/ops 中可以看到具體種類有 NCCL/Gloo/MPI 等等。
這些 op 由 op_manager 管理,op_manager 會根據優先順序找到可以用來計算的 op 進行計算,比如:
- MPI 用的就是 MPI_Allreduce,具體 scatter-gather 和 all-gather openMPI 有現成的實現;
- NCCL 就直接呼叫
ncclAllReduce
,比較新的 nccl 也支援跨節點的 allreduce 了,不用自己再套一層;
我們以 MPIAllreduce 為例進行說明,其定義如下:
class MPIAllreduce : public AllreduceOp {
public:
MPIAllreduce(MPIContext* mpi_context, HorovodGlobalState* global_state);
Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) override;
bool Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const override;
protected:
MPIContext* mpi_context_;
};
具體 Execute 就是呼叫 MPI_Allreduce 來完成操作,比如:
- 從記憶體中拷貝到 fusion buffer;
- 呼叫 MPI_Allreduce 實現歸併;
- 從 fusion buffer 拷貝出去;
Status MPIAllreduce::Execute(std::vector<TensorTableEntry>& entries, const Response& response) {
// Copy memory into the fusion buffer.
...
MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len);
...
// Do allreduce.
timeline.ActivityStartAll(entries, MPI_ALLREDUCE);
const void* sendbuf = entries.size() > 1 || fused_input_data == buffer_data
? MPI_IN_PLACE : fused_input_data;
int op = MPI_Allreduce(sendbuf, buffer_data,
(int) num_elements,
mpi_context_->GetMPIDataType(first_entry.tensor),
mpi_context_->GetMPISumOp(first_entry.tensor->dtype()),
mpi_context_->GetMPICommunicator(Communicator::GLOBAL));
// Copy memory out of the fusion buffer.
...
MemcpyOutFusionBuffer(buffer_data, entries);
...
}
3.4 後臺執行緒如何使用
因為 Horovod 主要是由一個後臺執行緒完成梯度操作,所以讓我們看看這個後臺執行緒之中如何呼叫到 Hovorod OP。
Horovod的工作流程比較簡單:
- HorovodGlobalState 之中有一個訊息佇列接收AllReduce,AllGather以及Broadcast等op的請求。
- 有一個後臺執行緒會每隔一段時間輪詢訊息佇列,拿到一批op之後,會對op中的tensor進行融合,再進行相應的操作。
- 如果tensor在視訊記憶體中,那麼它會使用NCCL庫執行。而如果是在記憶體中,則會使用MPI或者Gloo執行。
3.4.1 具體collective 操作
Horovod 的後臺執行緒拿到需要融合的tensor 之後,會呼叫 PerformOperation 進行具體的collective 操作。在 PerformOperation 之中有呼叫
void PerformOperation(Response response, HorovodGlobalState& state) {
......
Status status;
try {
// 進行collective的操作
status = op_manager->ExecuteOperation(entries, response);
} catch (const std::exception& ex) {
status = Status::UnknownError(ex.what());
}
......
}
邏輯如下:
+---------------------------------+
| | +-----------------------------+
| BackgroundThreadLoop | | |
| | | OperationManager |
| +--------------------------+ | | |
| | RunLoopOnce | | | |
| | | | | |
| | | | | |
| | ComputeResponseList | | +----------> ExecuteOperation |
| | + | | | | |
| | | | | | | |
| | | | | | | |
| | | | | | 1 | |
| | v | | | | |
| | | | | | |
| | PerformOperation +----------+ | |
| | | | | |
| +--------------------------+ | | |
| | | |
+---------------------------------+ +-----------------------------+
3.4.2 呼叫不同型別的OP
然後 status = op_manager->ExecuteOperation(entries, response) 會呼叫不同的 op->Execute(entries, response) 執行reduce 運算。
比如 ALLREDUCE 就呼叫了 ExecuteAllreduce(entries, response)。
Status OperationManager::ExecuteOperation(std::vector<TensorTableEntry>& entries,
const Response& response) const {
if (response.response_type() == Response::ALLREDUCE) {
return ExecuteAllreduce(entries, response); // 這裡
} else if (response.response_type() == Response::ALLGATHER) {
return ExecuteAllgather(entries, response);
} else if (response.response_type() == Response::BROADCAST) {
return ExecuteBroadcast(entries, response);
} else if (response.response_type() == Response::ALLTOALL) {
return ExecuteAlltoall(entries, response);
} else if (response.response_type() == Response::JOIN) {
return ExecuteJoin(entries, response);
}
.....
}
邏輯如下:
+---------------------------------+
| | +-----------------------+
| BackgroundThreadLoop | | |
| | | OperationManager |
| +--------------------------+ | | |
| | RunLoopOnce | | | |
| | | | | |
| | | | | |
| | ComputeResponseList | | +----------> ExecuteOperation |
| | + | | | | + |
| | | | | | | | |
| | | | | | | | |
| | | | | | 1 | | 2 |
| | v | | | | | |
| | | | | | | |
| | PerformOperation +----------+ | v |
| | | | | ExecuteAllreduce |
| +--------------------------+ | | |
| | | |
+---------------------------------+ +-----------------------+
3.4.3 取一個適配層
具體就是從 allreduce_ops_ 之中選取一個合適的 op,呼叫其Execute。
Status OperationManager::ExecuteAllreduce(std::vector<TensorTableEntry>& entries,
const Response& response) const {
for (auto& op : allreduce_ops_) {
if (op->Enabled(*param_manager_, entries, response)) {
return op->Execute(entries, response);
}
}
}
allreduce_ops_ 是從哪裡來的?在 OperationManager 構建函式中有。
allreduce_ops_(std::move(allreduce_ops)),
所以我們看看allreduce_ops 如何構建。
3.4.4 適配層構建
在 CreateOperationManager 之中對 allreduce_ops 進行新增。
可以看到,新增的型別大致如下:
- MPI_GPUAllreduce
- NCCLHierarchicalAllreduce
- NCCLAllreduce
- DDLAllreduce
- GlooAllreduce
- GPUAllreduce
- MPIAllreduce
- ......
OperationManager* CreateOperationManager(HorovodGlobalState& state) {
// Order of these operations is very important. Operations will be checked
// sequentially from the first to the last. The first 'Enabled' operation will
// be executed.
std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops;
std::vector<std::shared_ptr<AllgatherOp>> allgather_ops;
std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops;
std::vector<std::shared_ptr<AllreduceOp>> adasum_ops;
std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops;
#if HAVE_MPI && HAVE_GPU // 如果配置了MPI
if (mpi_context.IsEnabled()) {
#if HOROVOD_GPU_ALLREDUCE == 'M'
allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
new MPI_GPUAllreduce(&mpi_context, &gpu_context, &state)));
allreduce_ops.push_back(
std::shared_ptr<AllreduceOp>(new NCCLHierarchicalAllreduce(
&nccl_context, &mpi_context, &gpu_context, &state)));
#elif HAVE_DDL && HOROVOD_GPU_ALLREDUCE == 'D' //如果配置了DDL
allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
new DDLAllreduce(&ddl_context, &gpu_context, &state)));
#endif
#if HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N'//如果配置了NCCL
allreduce_ops.push_back(std::shared_ptr<AllreduceOp>(
new NCCLAllreduce(&nccl_context, &gpu_context, &state)));
#endif
......
因此我們知道,如何使用這些 Operation。
流程如下:
+---------------------------------+
| | +-----------------------+
| BackgroundThreadLoop | | |
| | | OperationManager |
| +--------------------------+ | | |
| | RunLoopOnce | | | |
| | | | | |
| | | | | | +--> GPUAllreduce
| | ComputeResponseList | | +----------> ExecuteOperation | |
| | + | | | | + | |
| | | | | | | | | +--> NCCLHierarchicalAllreduce
| | | | | | | | | |
| | | | | | 1 | | 2 | |
| | v | | | | | | +--> NCCLAllreduce
| | | | | | | | |
| | PerformOperation +----------+ | v | |
| | | | | ExecuteAllreduce | +--> DDLAllreduce
| +--------------------------+ | | + | |
| | | | | |
+---------------------------------+ | | | +--> GlooAllreduce
| | allreduce_ops----------+
| | | | +----------------+
| | | +--> | MPIAllreduce |
+-----------------------+ | |
| | |
+----------------------------------> Execute |
3 | |
+----------------+
手機如下:
回顧下每個 rank 節點的執行機制,每個rank有兩個thread:
- Execution thread 是用來做機器學習計算的。
- background thread 是負責通訊和allreduce。
到目前為止,我們其實分析的是第二部分:background thread 是負責通訊和allreduce。
下面我們要看看第一部分的某些環節,即 Tensorflow 這樣的框架是如何把 tensor & op 傳送給 後臺執行緒。
0x04 與通訊框架融合
Horovod 定義的這套HVD OP是跟具體深度學習框架無關的,比如使用 TensorFlow時候,是無法直接insert到TF Graph中執行的,所以還需要註冊TF的OP。
Horovod 針對各個框架定義了不同的實現。
針對 TensorFlow 模型分散式訓練,Horovod 開發了 TensorFlow ops 來實現 Tensorflow tensor 的 AllReduce。而且這些 op 可以融入 TensorFlow 的計算圖中,利用 TensorFlow graph 的 runtime 實現計算與通訊的 overlapping,從而提高通訊效率。
以 TensorFlow 模型的 AllReduce 分散式訓練為例,Horovod 開發了 allreduce ops 嵌入 TensorFlow 的反向計算圖中,從而獲取 TensorFlow 反向計算的梯度並進行梯度匯合。allreduce ops 可以通過呼叫 gloo 提供的 allreduce API 來實現梯度匯合的。
比如 在 horovod/tensorflow/mpi_ops.cc 之中,就針對 tensorflow 定義了 HorovodAllreduceOp。
4.1 TensorFlow 定義Op
對於 TensorFlow,可以自定義 Operation,即如果現有的庫沒有涵蓋你想要的操作, 你可以自己定製一個。
為了使定製的 Op 能夠相容原有的庫,你必須做以下工作:
- 在一個 C++ 檔案中註冊新 Op. Op 的註冊與實現是相互獨立的. 在其註冊時描述了 Op 該如何執行. 例如, 註冊 Op 時定義了 Op 的名字, 並指定了它的輸入和輸出.
- 使用 C++ 實現 Op. 每一個實現稱之為一個 "kernel", 可以存在多個 kernel, 以適配不同的架構 (CPU, GPU 等)或不同的輸入/輸出型別.
- 建立一個 Python 包裝器(wrapper). 這個包裝器是建立 Op 的公開 API. 當註冊 Op 時, 會自動生成一個預設 預設的包裝器. 既可以直接使用預設包裝器, 也可以新增一個新的包裝器.
- (可選) 寫一個函式計算 Op 的梯度.
- (可選) 寫一個函式, 描述 Op 的輸入和輸出 shape. 該函式能夠允許從 Op 推斷 shape.
- 測試 Op, 通常使用 Pyhton。如果你定義了梯度,你可以使用Python的GradientChecker來測試它。
4.2 Horovod 實現 --- HorovodAllreduceOp
HorovodAllreduceOp 就是一種TF Async OP,然後其內部實現中呼叫了 HVD OP,這是比較巧妙的組合模式。顯然繼承了TP Aysnc OP的HorovodAllReduce 是可以插入到TF Graph裡面,然後被正常執行的。
新增新的OP需要3步,我們具體看看。
4.2.1 定義 Op 的介面
第一步是定義Op 的介面,使用REGISTER_OP()向 TensorFlow 系統註冊來定義 Op 的介面,該OP就是HorovodAllreduceOp。
// 1. 定義 Op 的介面
// REGISTER_OP()向 TensorFlow 系統註冊來定義 Op 的介面,該OP就是HorovodAllreduceOp.
// 在註冊時, 指定 Op 的名稱: REGISTER_OP("HorovodAllreduce")
// 輸入(型別和名稱): Input("tensor: T")
// 輸出(型別和名稱): Output("sum: T")
// 和所需要任何 屬性的文件說明Doc(R"doc(...)doc");
//
// 該 Op 接受一個 T 型別 tensor 作為輸入, T 型別可以是{int32, int64, float32, float64}
// 輸出一個 T 型別 tensor sum,sum是在所有的MPI程序中求和
REGISTER_OP("HorovodAllreduce")
.Attr("T: {int32, int64, float16, float32, float64}")
.Attr("reduce_op: int")
.Attr("prescale_factor: float")
.Attr("postscale_factor: float")
.Attr("ignore_name_scope: bool = False")
.Input("tensor: T")
.Output("sum: T")
.SetShapeFn([](shape_inference::InferenceContext* c) {
c->set_output(0, c->input(0));
return Status::OK();
});
4.2.2 為 Op 實現 kernel
第二步是為 Op 實現 kernel。在定義介面之後, 每一個實現稱之為一個 "kernel",提供一個或多個 Op 的實現,即可以存在多個 kernel。
HorovodAllreduceOp 類繼承 AsyncOpKernel,覆蓋 其ComputeAsync() 方法。ComputeAsync()方法提供一個型別為 OpKernelContext* 的引數 context, 用於訪問一些有用的資訊, 例如輸入和輸出的 tensor。
在 ComputeAsync 裡,會把這一 AllReduce 的請求入隊。可以看到,在 TensorFlow 支援的實現上,Horovod 與百度大同小異。都是自定義了 AllReduce Op,在 Op 中把請求入隊。
// 2. 為 Op 實現 kernel。
// 在定義介面之後, 每一個實現稱之為一個 "kernel",提供一個或多個 Op 的實現,即可以存在多個 kernel。
// 為這些 kernel 的每一個建立一個對應的類, 繼承 AsyncOpKernel, 覆蓋 ComputeAsync 方法。
// ComputeAsync 方法提供一個型別為 OpKernelContext* 的引數 context, 用於訪問一些有用的資訊, 例如輸入和輸出的 tensor
class HorovodAllreduceOp : public AsyncOpKernel {
public:
// 防止類建構函式的隱式自動轉換,只能顯示呼叫該建構函式
explicit HorovodAllreduceOp(OpKernelConstruction* context)
: AsyncOpKernel(context) {
OP_REQUIRES_OK(context, context->GetAttr("reduce_op", &reduce_op_));
OP_REQUIRES_OK(context, context->GetAttr("prescale_factor", &prescale_factor_));
OP_REQUIRES_OK(context, context->GetAttr("postscale_factor", &postscale_factor_));
OP_REQUIRES_OK(context, context->GetAttr("ignore_name_scope", &ignore_name_scope_));
}
// 重寫ComputeAsync()方法
void ComputeAsync(OpKernelContext* context, DoneCallback done) override {
OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()),
done);
auto node_name = name();
if (ignore_name_scope_) {
auto pos = node_name.find_last_of('/');
if (pos != std::string::npos) {
node_name = node_name.substr(pos + 1);
}
}
auto device = GetDeviceID(context);
auto tensor = context->input(0);
horovod::common::ReduceOp reduce_op = static_cast<horovod::common::ReduceOp>(reduce_op_);
Tensor* output;
OP_REQUIRES_OK_ASYNC(
context, context->allocate_output(0, tensor.shape(), &output), done);
// ReadyEvent makes sure input tensor is ready, and output is allocated.
// shared_ptr 是一個標準的共享所有權的智慧指標, 允許多個指標指向同一個物件
auto ready_event = std::shared_ptr<common::ReadyEvent>(RecordReadyEvent(context));
// 模板函式 std::make_shared 可以返回一個指定型別的 std::shared_ptr
auto hvd_context = std::make_shared<TFOpContext>(context);
auto hvd_tensor = std::make_shared<TFTensor>(tensor);
auto hvd_output = std::make_shared<TFTensor>(*output);
// 將張量的Allreduce操作OP加入佇列
auto enqueue_result = EnqueueTensorAllreduce(
hvd_context, hvd_tensor, hvd_output, ready_event, node_name, device,
[context, done](const common::Status& status) {
context->SetStatus(ConvertStatus(status));
done();
}, reduce_op, (double) prescale_factor_, (double) postscale_factor_);
OP_REQUIRES_OK_ASYNC(context, ConvertStatus(enqueue_result), done);
}
private:
int reduce_op_;
// Using float since TF does not support double OP attributes
float prescale_factor_;
float postscale_factor_;
bool ignore_name_scope_;
};
4.2.3 註冊OP到 TensorFlow 系統
第三步是註冊OP到 TensorFlow 系統。
// 3. 註冊OP到 TensorFlow 系統
// 註冊時可以指定該 kernel 執行時的多個約束條件. 例如可以指定一個 kernel 在 CPU 上執行, 另一個在 GPU 上執行
REGISTER_KERNEL_BUILDER(Name("HorovodAllreduce").Device(DEVICE_CPU),
HorovodAllreduceOp);
// 如果執行了GPU
#if HOROVOD_GPU_ALLREDUCE
REGISTER_KERNEL_BUILDER(Name("HorovodAllreduce").Device(DEVICE_GPU),
HorovodAllreduceOp);
#endif
4.2.4 注意點
具體可以參考 add new op,裡面規範了 Tensorflow 自定義運算元的實現。
請注意,生成的函式將獲得一個蛇形名稱(以符合 PEP8)。因此,如果您的操作在 C++ 檔案中命名為 ZeroOut,則 Python 函式將稱為 zero_out。
C++ 的定義是駝峰的,生成出來的 python 函式是下劃線小寫的,所以最後對應的是,適配Op的程式碼在 horovod/tensorflow 目錄下面。
C++ | Python |
---|---|
HorovodAllgather | horovod_allgather |
HorovodAllreduce | horovod_allreduce |
HorovodBroadcast | horovod_broadcast |
所以,在 python 世界中,當 _DistributedOptimizer 呼叫 compute_gradients 來優化的時候,會通過 _allreduce 來呼叫到 MPI_LIB.horovod_allreduce,也就是呼叫到 HorovodAllreduceOp 這裡。
具體 _DistributedOptimizer 如何呼叫到 _allreduce,我們在後續文章中會講解。
def _allreduce(tensor, name=None, op=Sum):
if name is None and not _executing_eagerly():
name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op)
4.3 如何使用
4.3.1 EnqueueTensorAllreduce
HorovodAllreduceOp 類會呼叫 EnqueueTensorAllreduce() 方法,將張量的Allreduce操作OP加入HorovodGlobalState的佇列中。
EnqueueTensorAllreduce 位於:/horovod/common/operations.cc。
具體方法就是構建contexts,callbacks等各種支撐資料,然後呼叫 EnqueueTensorAllreduces 進行處理。
// Contexts and controller must be initialized and the background thread
// must be running before this function is called.
Status EnqueueTensorAllreduce(std::shared_ptr<OpContext> context,
std::shared_ptr<Tensor> tensor,
std::shared_ptr<Tensor> output,
std::shared_ptr<ReadyEvent> ready_event,
std::string name, const int device,
StatusCallback callback,
ReduceOp reduce_op,
double prescale_factor,
double postscale_factor) {
// Wrap inputs in std::vector and pass onto multi tensor implementation
std::vector<std::shared_ptr<OpContext>> contexts;
std::vector<std::shared_ptr<Tensor>> tensors;
std::vector<std::shared_ptr<Tensor>> outputs;
std::vector<std::shared_ptr<ReadyEvent>> ready_events;
std::vector<std::string> names;
std::vector<StatusCallback> callbacks;
contexts.emplace_back(std::move(context));
tensors.emplace_back(std::move(tensor));
outputs.emplace_back(std::move(output));
ready_events.emplace_back(std::move(ready_event));
names.emplace_back(std::move(name));
callbacks.emplace_back(std::move(callback));
return EnqueueTensorAllreduces(contexts, tensors, outputs, ready_events,
names, device, callbacks, reduce_op,
prescale_factor, postscale_factor);
}
4.3.2 提交命令
EnqueueTensorAllreduces 主要就是呼叫 AddToTensorQueueMulti 向 tensor queue 提交操作,方法邏輯為:
- 把需要 reduce 的 tensor 組裝成一個Request。
- 針對每個 tensor,會建立對應 TensorTableEntry,用於儲存tensor 的權重,message 主要是一些 元資訊 metadata。
- 把 request 和 TensorTableEntry往 GlobalState 的 tensor_queue 裡面塞,這是一個程序內共享的全域性物件維護的一個佇列。
- 等待後臺執行緒去讀取這些allreduce 的請求。後臺程序,會一直在執行一個迴圈
RunLoopOnce
。在其中,後臺執行緒會利用 MPIController 來處理入隊的請求。 MPIController 可以理解為是協調不同的 Rank 程序,處理請求的物件。這個抽象是百度所不具備的,主要是為了支援 Facebook gloo 等其他的集合計算庫。因此 Horovod 也有 GlooController 等等實現。
具體程式碼如下:
Status EnqueueTensorAllreduces(std::vector<std::shared_ptr<OpContext>>& contexts,
std::vector<std::shared_ptr<Tensor>>& tensors,
std::vector<std::shared_ptr<Tensor>>& outputs,
std::vector<std::shared_ptr<ReadyEvent>>& ready_events,
std::vector<std::string>& names,
const int device,
std::vector<StatusCallback>& callbacks,
ReduceOp reduce_op,
double prescale_factor,
double postscale_factor) {
Status status;
......
std::vector<Request> messages;
std::vector<TensorTableEntry> entries;
messages.reserve(tensors.size());
entries.reserve(tensors.size());
for (int n = 0; n < tensors.size(); ++n) { // 遍歷需要 reduce 的 tensor
// 把tensor組裝成一個Request
Request message;
message.set_request_rank(horovod_global.controller->GetRank());
message.set_tensor_name(names[n]);
message.set_tensor_type(tensors[n]->dtype());
message.set_device(device);
message.set_prescale_factor(prescale_factor);
message.set_postscale_factor(postscale_factor);
if (reduce_op == ReduceOp::ADASUM) {
message.set_request_type(Request::ADASUM);
} else {
message.set_request_type(Request::ALLREDUCE);
}
message.set_tensor_shape(tensors[n]->shape().to_vector());
messages.push_back(std::move(message));
TensorTableEntry e;
e.tensor_name = names[n];
e.context = std::move(contexts[n]);
// input and output can be the same, only move when safe
if (tensors[n] != outputs[n]) {
e.tensor = std::move(tensors[n]);
e.output = std::move(outputs[n]);
} else {
e.tensor = tensors[n];
e.output = outputs[n];
}
e.ready_event = std::move(ready_events[n]);
e.device = device;
e.callback = std::move(callbacks[n]);
// 針對每個 tensor,會建立對應 TensorTableEntry,用於儲存tensor 的權重,message 主要是一些 元資訊 metadata
entries.push_back(std::move(e));
}
std::string tensors_enqueued;
for (const auto& n : names) {
tensors_enqueued += n + "; ";
}
// Only create groups larger than 1 tensor, unless disable_group_fusion is requested.
// In that case, even single tensor groups are created to enforce disabling fusion.
if (tensors.size() > 1 || horovod_global.disable_group_fusion) {
auto group_id = horovod_global.group_table.RegisterGroup(std::move(names));
for (auto& message : messages) {
message.set_group_id(group_id);
}
}
// 往 GlobalState 的 tensor_queue 裡面新增
status = horovod_global.tensor_queue.AddToTensorQueueMulti(entries, messages);
return status;
}
4.3.3 TensorQueue
Tensor 和 op 主要是新增到 TensorQueue,具體就是呼叫 如下:
status = horovod_global.tensor_queue.AddToTensorQueueMulti(entries, messages);
AddToTensorQueue 和 AddToTensorQueueMulti 函式基本邏輯類似,只不過後者是處理多個message,具體如下:
- 將MPI Request message請求加入 horovod_global.message_queue;
- 將TensorTableEntry e 加入horovod_global.tensor_table ;
// Add a TensorTableEntry as well as its message to the queue.
Status TensorQueue::AddToTensorQueue(TensorTableEntry& e, Request& message) {
std::lock_guard<std::mutex> guard(mutex_);
if (tensor_table_.find(e.tensor_name) != tensor_table_.end()) {
return DUPLICATE_NAME_ERROR;
}
tensor_table_.emplace(e.tensor_name, std::move(e));
message_queue_.push(std::move(message));
return Status::OK();
}
Status TensorQueue::AddToTensorQueueMulti(std::vector<TensorTableEntry>& entries,
std::vector<Request>& messages) {
std::lock_guard<std::mutex> guard(mutex_);
for (int i = 0; i < entries.size(); ++i) {
if (tensor_table_.find(entries[i].tensor_name) != tensor_table_.end()) {
return DUPLICATE_NAME_ERROR;
}
tensor_table_.emplace(entries[i].tensor_name, std::move(entries[i]));
message_queue_.push(std::move(messages[i]));
}
return Status::OK();
}
這樣就新增到了 message queue,我們的邏輯也完成了。
0x05 總結
總結Horovod的梯度同步更新以及AllReduce操作的全過程如下:
- 首先HVD定義TF非同步的AllReduce OP,通過wrap optimizer將AllReduce OP插入到TF execution Graph中;
- OP內部主要就是把All Reduce需要的資訊打包成Request,傳送給coordinator(Rank0);
- 由Rank0協調所有Rank的request,並在所有Rank都Ready後,傳送Response讓各個Rank執行AllReduce操作。
具體如下圖:
+----------------------+ +
| Computation Graph | Execution Thread | Background Communication Thread
+---------+------------+ |
| |
| |
v |
|
+----------------+ |
| | |
| TF Aysnc Op | |
| | |
+------+---------+ |
| |
| |
| |
v | +-----------------------+
+ | HorovodGlobalState |
+---------------------+ EnqueueTensorAllreduce(tensor, op) | |
| | +---------------+ | |
| HorovodAllreduceOp | +--------------------------------------> | HorovodOp | +-------------------------> message_queue |
| | +----+-----+---++ | |
+---------------------+ ^ ^ ^ ^ ^ | tensor_table |
| | | | | | |
+----------------------------+ | | | | +-----------------------+
| +---------------------+ | | +-----------------+
| | +-------+ | |
| | | | |
+------+-----+ +---+----+ +---------+---+ +----+--------+ +----------+--+
| AlltoallOp | | JoinOp | | AllreduceOp | | AllgatherOp | | BroadcastOp |
+------------+ +--------+ ++---+----+--++ +-------------+ +-------------+
^ ^ ^ ^
| | | |
+------------------+ | | +-----------------------------------+
| +-------+ +-------------+ |
| | | |
+-----+--------+ +---+----------+ +------------+--------+ +------------+---+
| MPIAllreduce | | GPUAllreduce | | AdasumMPIAllreduceOp| | GlooAllreduce |
+--------------+ +--------------+ +---------------------+ +----------------+
手機如下:
0xEE 個人資訊
★★★★★★關於生活和技術的思考★★★★★★
微信公眾賬號:羅西的思考
如果您想及時得到個人撰寫文章的訊息推送,或者想看看個人推薦的技術資料,敬請關注。
0xFF 參考
Scaling model training in PyTorch using distributed data parallel
A developer-friendly guide to mixed precision training with PyTorch
It’s 2020, why isn’t deep learning 100% on the cloud yet?
到了2020年,為什麼還不可以在雲上進行100%的深度學習?
在 Amazon SageMaker 管道模式下使用 Horovod 實現多 GPU 分散式訓練