1. 程式人生 > 實用技巧 >TensorFlow中的通訊機制——Rendezvous(一)本地傳輸

TensorFlow中的通訊機制——Rendezvous(一)本地傳輸

TensorFlow中的通訊機制——Rendezvous(一)本地傳輸

背景

[作者:DeepLearningStack,阿里巴巴演算法工程師,開源TensorFlow Contributor]

歡迎大家關注我的公眾號,“網際網路西門二少”,我將繼續輸出我的技術乾貨~

在TensorFlow原始碼中我們經常能看到一個奇怪的詞——Rendezvous。如果從仔細統計該單詞出現的頻率和模組,你會發現無論在單機還是分散式,無論在core目錄還是contrib目錄都存在它的身影,所涉及的模組非常多。Rendezvous是一個法語單詞,發音也比較特殊,一般直譯為“約會、相會、會和”,而在TensorFlow中,Rendezvous是用來完成訊息傳輸的通訊元件。大部分原始碼讀者在起初閱讀時通訊部分的程式碼時可能會覺得有點懵圈,為什麼不使用Communicator這樣簡單明瞭的單詞來表明通訊過程,反而使用這樣一個晦澀的法語詞作為抽象呢?其實在瞭解TensorFlow訊息通訊的原理後就會發現,使用Rendezvous作為這一過程的抽象是非常貼切的。

因為Rendezvous所涉及的模組元件較多,為了讓讀者循序漸進地理解TensorFlow中的通訊機制,決定將Rendezvous分成多個系列,由淺入深分開梳理。這樣做的目的不但能讓讀者閱讀時對整體層次結構有較好的把握,而且簡短的篇幅也便於閱讀,所以建議讀者按順序閱讀本系列。 本文是TensorFlow通訊機制系列的第一篇文章,側重整體結構和本地傳輸通訊的梳理。

訊息傳輸的唯一識別符號——ParsedKey

在TensorFlow中無論是單機還是分散式都涉及到訊息傳輸,並且訊息傳輸總是從傳送端Send,接收端Recv。那麼這裡就存在一個訊息的對應問題:在多組訊息同時傳送接收時,需要對每一對Send和Recv梳理一個對應關係,即Send端傳送的訊息與Recv端接收的訊息不能有錯位。如果Recv端本打算接收的訊息是A,但由於訊息對應錯誤導致接收到了B,那麼整個訓練過程就會出現錯誤。其實解決這個問題也非常簡單,因為每一對Send和Recv所處理訊息都是同一個,所以只要讓某個訊息在被Send前加上一個唯一識別符號,而Recv在接收訊息前也能夠按照某種規則拼出一樣的唯一識別符號,這個對應關係就完美解決了。在TensorFlow中確實定義了這樣一種識別符號,它就是結構體ParsedKey。

ParsedKey結構體

在tensorflow/core/framework/rendezvous.h的Rendezvous類內定義了結構體ParsedKey,它內容非常簡短卻又十分全面,不但包含了訊息傳輸的所有必須的內容,還具備唯一性,在我們直接分析其原始碼結構。

 1 // Parses the key constructed by CreateKey and parse src/dst device
 2 // names into structures respectively.
 3 struct ParsedKey {
 4   StringPiece src_device;
 5   DeviceNameUtils::ParsedName src;
 6   uint64 src_incarnation = 0;
 7   StringPiece dst_device;
 8   DeviceNameUtils::ParsedName dst;
 9   StringPiece edge_name;
10 
11   ParsedKey() {}
12   ParsedKey(const ParsedKey& b) { *this = b; }
13 
14   ParsedKey& operator=(const ParsedKey& b);
15   StringPiece FullKey() const { return buf_; }
16 
17  private:
18   friend class Rendezvous;
19   friend class SendOp;
20   friend class RecvOp;
21   string buf_;
22 };

可以看到其結構非常簡單,一個完備的ParsedKey要包括六個部分。

src_device:訊息傳送源的字串資訊,形如/job:localhost/replica:0/task_id:0/device:GPU:0

src:和src_device的資訊量相同,只不過是結構體的表示方法

src_incarnation:一般來說這個欄位沒有什麼作用,但是當某個worker重啟後,該值會發生變化,用來和之前掛掉的worker做區分,這便於debug

dst_device:訊息傳送的接收方字串資訊,格式和src_device相同

dst:和dst_device的資訊量相同,只不過是結構體的表示方法

edge_name:這個欄位是該Key最特殊的地方,它可以靈活指定為任何字串,實現不同Key的區分。比如它可以是Tensor的名字,也可以是具有某種特殊意義的固定字串

CreateKey過程與ParseKey過程

一般情況下,在TensorFlow中應該優先使用CreateKey函式來構造可以解析的Key字串,然後經過ParseKey過程將該字串的每個資訊解析到ParsedKey結構體中,之所以使用CreateKey函式構造Key字串是因為這是最安全保險的方式,下面是CreateKey函式構造Key字串的過程展現。

CreateKey只要接受五個引數即可安全構造字串形式的Key,這裡面特殊之處有兩個,a. 引數中frame_and_iter一般直接取自OpKernelContext中的FrameAndIter物件;b. src_incarnation要做一個十六進位制的字串轉換。CreateKey函式的輸出是以分號(";")為分隔符的字串,該字元串同樣包含五個域。CreateKey是一個static函式,程式碼比較簡單,就不在這裡列出。隨後我們這個字串傳入ParseKey函式即可完成結構體ParsedKey的解析,解析過程如下。

ParseKey對輸入字串的前四個域做了對映,拋棄了第五個域,但是在提供Key字串時需要提供完整的五個域,否則會檢查報錯。和CreateKey相同,ParseKey過程也是一個static函式,程式碼如下所示。

 1 /* static */
 2 Status Rendezvous::ParseKey(StringPiece key, ParsedKey* out) {
 3   if (key.data() == out->buf_.data()) {
 4     // Caller used our buf_ string directly, so we don't need to copy.  (The
 5     // SendOp and RecvOp implementations do this, for example).
 6     DCHECK_EQ(key.size(), out->buf_.size());
 7   } else {
 8     // Make a copy that our StringPieces can point at a copy that will persist
 9     // for the lifetime of the ParsedKey object.
10     out->buf_.assign(key.data(), key.size());
11   }
12   StringPiece s(out->buf_);
13   StringPiece parts[5];
14   for (int i = 0; i < 5; i++) {
15     parts[i] = ConsumeNextPart(&s, ';');
16   }
17   if (s.empty() &&          // Consumed the whole string
18       !parts[4].empty() &&  // Exactly five parts
19       DeviceNameUtils::ParseFullName(parts[0], &out->src) &&
20       strings::HexStringToUint64(parts[1], &out->src_incarnation) &&
21       DeviceNameUtils::ParseFullName(parts[2], &out->dst) &&
22       !parts[3].empty()) {
23     out->src_device = StringPiece(parts[0].data(), parts[0].size());
24     out->dst_device = StringPiece(parts[2].data(), parts[2].size());
25     out->edge_name = StringPiece(parts[3].data(), parts[3].size());
26     return Status::OK();
27   }
28   return errors::InvalidArgument("Invalid  rendezvous key: ", key);
29 }

Rendezvous

在瞭解ParsedKey之後,我們就可以窺探Rendezvous這個類的內部結構和實現了。最基本的Rendezvous類被定義在了tensorflow/core/framework/rendezvous.h檔案中,它對外提供了最基本的Send、Recv和RecvAsync介面和實現。總體來說這個類還是比較抽象的,在不同的通訊場景下需要提供不同的實現。比如對於本地傳輸來說,TensorFlow提供了LocalRendezvousIntraProcessRendezvous實現類,對於使用跨程序通訊場景來說,TensorFlow提供了RemouteRendezvous實現系列。不同通訊場景的實現細節差別相當大,所以本系列將對這些做逐個梳理,本文只關注本地傳輸部分。如果對跨程序傳輸感興趣,那麼請關注該系列的下一篇文章。Rendezvous類中最重要的函式是Send和Recv系列,它們的簽名和註釋如下程式碼所示。

 1 // The caller is a tensor producer and it sends a message (a tensor
 2 // "val" and a bool "is_dead") under the given "key".
 3 //
 4 // {val, is_dead} is bundled as a message sent and received.
 5 // Typically, is_dead is set by some control flow nodes
 6 // (e.g., a not-taken branch).  args is passed by Send to the
 7 // Recv function to communicate any information that the Recv
 8 // function might need.  This is typically only necessary for
 9 // Send/Recv on the same worker.
10 //
11 // Send() never blocks.
12 virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val, const bool is_dead) = 0;
13 
14 virtual void RecvAsync(const ParsedKey& key, const Args& args, DoneCallback done) = 0;
15 
16 // Synchronous wrapper for RecvAsync.
17 Status Recv(const ParsedKey& key, const Args& args, Tensor* val, bool* is_dead, int64 timeout_ms);
18 Status Recv(const ParsedKey& key, const Args& args, Tensor* val, bool* is_dead);

TensorFlow中的Recv有兩種,一種是同步版本,換一種是非同步版本。通常情況下為了計算和通訊的overlap,TensorFlow廣泛使用了RecvAsync函式。並且在後面一節中我們可以知道,Send過程並不是真的參與資料通訊,所有的通訊過程均由RecvAsync完成。

Rendezvous相關類結構

在瞭解通訊過程之前,應該先熟悉下Rendezvous相關的類結構。下面的類圖展示了當期TensorFlow系統中所有的Rendezvous相關類圖結構。

所有的Rendezvous相關類都以Rendezvous基類為核心,LocalRendezvous和IntraProcessRendezvous是我們本文分析的重點,SimpleRendezvous實現非常簡單,讀者可以在熟悉前兩個實現之後自行分析該類。而BaseRemoteRendezvous類以及相關類是跨程序通訊相關的元件,這部分內容將在下一篇文章中分析。

Rendezvous基類中的Recv函式

因為Recv函式只是RecvAsync函式的同步版本封裝,因此在每個實現類繼承重新函式時,只需要提供Send函式的實現和RecvAsync函式實現即可,下面的程式碼是Rendezvous基類中同步版本實現。

 1 Status Rendezvous::Recv(const ParsedKey& key, const Args& recv_args,
 2                         Tensor* val, bool* is_dead, int64 timeout_ms) {
 3   Status ret;
 4   Notification n;
 5   RecvAsync(key, recv_args,
 6             [&ret, &n, val, is_dead](const Status& s, const Args& send_args,
 7                                      const Args& recv_args, const Tensor& v,
 8                                      const bool dead) {
 9               ret = s;
10               *val = v;
11               *is_dead = dead;
12               n.Notify();
13             });
14   if (timeout_ms > 0) {
15     int64 timeout_us = timeout_ms * 1000;
16     bool notified = WaitForNotificationWithTimeout(&n, timeout_us);
17     if (!notified) {
18       return Status(error::DEADLINE_EXCEEDED,
19                     "Timed out waiting for notification");
20     }
21   } else {
22     n.WaitForNotification();
23   }
24   return ret;
25 }

可以看出,無論RecvAsync的實現內容是什麼,Recv函式都可以將RecvAsync視為黑盒,在其上層封裝成為與RecvAsync相同實現的同步函式版本

本地傳輸過程

使用本地傳輸過程包括LocalRendezous和IntraProcessRendezvous兩個實現類,但是後者是前者的封裝,因此本文分析的重點在於LocalRendezvous實現類。

訊息佇列的快取——Table

在TensorFlow中,幾乎每個Rendezvous實現類都有自己的訊息佇列快取,而幾乎每種訊息佇列快取都是依靠Table實現的。Rendezvous的傳送(Send)和接收(Recv)都將通過Table完成,這完美地闡釋了“約會、相會、會和”的釋義,這也是為什麼TensorFlow使用這樣一個法語詞來抽象通訊過程。下圖形象化的表示了Table以及Table中的每個Item。

在LocalRendezvous實現類中,Send端和Recv端使用的是同一個Rendezvous物件,所以他們共享同一個Table,所以Table屬於臨界資源,應該加鎖形成互斥訪問。Item這個結構中其實有很多內容,在上圖中只解釋兩個比較重要的部分。

Value:這就是參與通訊Tensor本體

Waitor:這是在確認Tensor被接收端完成接收後的處理函式,也就是consumer處理該Tensor的函式過程

傳輸過程分析

無論是Send過程還是Recv過程,它們都將藉助Table完成Tensor的轉發。Send過程作為Tensor的生產者,它負責將待發送的Tensor送入Table中,並將ParsedKey作為該Item的鍵。而Recv過程作為消費者,它也會根據自己所需拼出相同的ParsedKey,然後從Table中檢視是否已經存在該項。

應該注意的是,Tensor雖然由Send端生產,但是Table中的Item卻不一定是由Send端插入。因為在TensorFlow中,Send和RecvAsync二者的相對順序是不能保證先後的,經常出現需求比供給在時間片上先到的情況,那麼這時就會出現RecvAsync先拼出了ParsedKey然後立即查表的情況。應對這種情況的一種方案是,RecvAsync放棄此次查詢,開啟另一個執行緒輪詢該表直到Send端產生為止然後執行consumer的waiter函式,但這是一個非常消耗資源的實現方式。TensorFlow為了保證非同步性,使用另一種無需CPU輪詢消耗資源的實現方式。

我們知道,在Send和RecvAsync順序相對非同步的情況下,waitor函式的執行時機只有兩種情況,它取決於Send的供給和RecvAsync的需求哪一個先到達。若生產者先到達,那麼waiter函式的呼叫由RecvAsync執行。若消費者的需求先到達,那麼waiter函式的呼叫由Send執行。簡而言之,總是遲到的一方執行waiter函式。那麼可以這樣設計:和Send端相同,允許RecvAsync將所需的Item插入到Table中,並連同waiter函式一起傳送到該表裡。如果Send端後到達,那麼Send函式將從表中取出該Item,並執行waiter函式,反之,則由RecvAsync函式取出自己所需要的Item,然後執行waiter函式,下面的圖展示了這個過程。

Send過程原始碼

瞭解上述的過程後,我們可以直接看Send函式的原始碼了。下面是LocalRendezvous的Send函式原始碼展示。

 1 Status Send(const ParsedKey& key, const Args& send_args, const Tensor& val,
 2               const bool is_dead) override {
 3     uint64 key_hash = KeyHash(key.FullKey());
 4     VLOG(2) << "Send " << this << " " << key_hash << " " << key.FullKey();
 5 
 6     mu_.lock();
 7     if (!status_.ok()) {
 8       // Rendezvous has been aborted.
 9       Status s = status_;
10       mu_.unlock();
11       return s;
12     }
13 
14     ItemQueue* queue = &table_[key_hash];
15     if (queue->empty() || queue->front()->IsSendValue()) {
16       // There is no waiter for this message. Append the message
17       // into the queue. The waiter will pick it up when arrives.
18       // Only send-related fields need to be filled.
19       Item* item = new Item;
20       item->value = val;
21       item->is_dead = is_dead;
22       item->send_args = send_args;
23       if (item->send_args.device_context) {
24         item->send_args.device_context->Ref();
25       }
26       queue->push_back(item);
27       mu_.unlock();
28       return Status::OK();
29     }
30 
31     // There is an earliest waiter to consume this message.
32     Item* item = queue->front();
33     queue->pop_front();
34     mu_.unlock();
35 
36     // Notify the waiter by invoking its done closure, outside the
37     // lock.
38     DCHECK(!item->IsSendValue());
39     item->waiter(Status::OK(), send_args, item->recv_args, val, is_dead);
40     delete item;
41     return Status::OK();
42   }

RecvAsync過程原始碼

下面是LocalRendezvous的RecvAsync函式原始碼展示。

 1   void RecvAsync(const ParsedKey& key, const Args& recv_args,
 2                  DoneCallback done) override {
 3     uint64 key_hash = KeyHash(key.FullKey());
 4     VLOG(2) << "Recv " << this << " " << key_hash << " " << key.FullKey();
 5 
 6     mu_.lock();
 7     if (!status_.ok()) {
 8       // Rendezvous has been aborted.
 9       Status s = status_;
10       mu_.unlock();
11       done(s, Args(), recv_args, Tensor(), false);
12       return;
13     }
14 
15     ItemQueue* queue = &table_[key_hash];
16     if (queue->empty() || !queue->front()->IsSendValue()) {
17       // There is no message to pick up.
18       // Only recv-related fields need to be filled.
19       Item* item = new Item;
20       item->waiter = std::move(done);
21       item->recv_args = recv_args;
22       if (item->recv_args.device_context) {
23         item->recv_args.device_context->Ref();
24       }
25       queue->push_back(item);
26       mu_.unlock();
27       return;
28     }
29 
30     // A message has already arrived and is queued in the table under
31     // this key.  Consumes the message and invokes the done closure.
32     Item* item = queue->front();
33     queue->pop_front();
34     mu_.unlock();
35 
36     // Invokes the done() by invoking its done closure, outside scope
37     // of the table lock.
38     DCHECK(item->IsSendValue());
39     done(Status::OK(), item->send_args, recv_args, item->value, item->is_dead);
40     delete item;
41   }

關於IntraProcessRendezvous的Send和RecvAsync函式

其實本質上IntraProcessRendezvous和LocalRendezvous是同一個函式實現,只是前者對後者做了一層封裝。我們從原始碼中看到,LocalRendezvous是IntraProcessRendezvous的成員之一,只是在回撥函式中多了一些簡單的處理而已,比如它會仔細考量Tensor的生產方和消費方是存在於CPU還是GPU,是否可以通過P2P直接拷貝,還是需要通過Host做中轉,關於拷貝過程使用的是下面的函式,其他地方大同小異,因此不再贅述。有興趣的讀者可以到tensorflow/core/common_runtime/目錄下參考rendezvous_mgr.h、rendezvous_mgr.cc和copy_tensor.h與copy_tensor.cc這幾個檔案。

 1 // Copies "input" to "output" between devices accessible to the
 2   // local process via some DMA-like method.  "edge_name" is the name
 3   // of the tensor being copied, for debugging purposes. Depending on
 4   // the type of devices and memory in use, the copy may be performed
 5   // synchronously or asynchronously.  'done' will be invoked only
 6   // after the copy is actually complete.
 7   static void ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
 8                      DeviceContext* recv_dev_context, Device* src, Device* dst,
 9                      const AllocatorAttributes src_alloc_attr,
10                      const AllocatorAttributes dst_alloc_attr,
11                      const Tensor* input, Tensor* output,
12                      int dev_to_dev_stream_index, StatusCallback done);

總結

本文是TensorFlow通訊機制系列的第一篇文章,先通過丟擲高併發情況下訊息通訊兩端的對應問題引出TensorFlow中的ParsedKey結構設計的必要性,然後給出了Rendezvous全域性類圖,最後詳細的分析了LocalRendezvous的訊息傳輸實現過程。TensorFlow的通訊機制的完美的闡釋了Rendezvous一詞的含義——無論是Send端還是Recv端都需要在臨界資源Table中“約會”,進行訊息的傳輸。隨後還著重分析了非同步情況下,本屬於consumer的waiter函式呼叫時機設計問題——為了保證waiter函式的執行不被阻塞,從設計上採取Late invoke的方案。IntraProcessRendezous本質是LocalRendezvous的一層封裝,它在資料拷貝上面做了更多的工作,藉助LocalRendezvous實現了Send和Recv處於不同或相同種類Device情況下,對上層完全透明的拷貝過程。由於篇幅原因,特意將TensorFlow通訊機制分為多個系列分析,作為第一篇文章,本篇介紹了Rendezvous的基本框架。在該系列之後的文章中,還會對跨程序的通訊進行詳細地分析。