[原始碼解析]機器學習引數伺服器ps-lite(4) ----- 應用節點實現
[原始碼解析]機器學習引數伺服器ps-lite(4) ----- 應用節點實現
目錄- [原始碼解析]機器學習引數伺服器ps-lite(4) ----- 應用節點實現
0x00 摘要
本文是引數伺服器的第四篇,介紹KVWorker, KVServer。
本系列其他文章是:
[原始碼解析] 機器學習引數伺服器ps-lite 之(1) ----- PostOffice
[原始碼解析] 機器學習引數伺服器ps-lite(2) ----- 通訊模組Van
[原始碼解析] 機器學習引數伺服器ps-lite 之(3) ----- 代理人Customer
KVWorker, KVServer這兩個分別是 Server / Worker 節點的抽象,是被 Van ---> Customer ---> recv_handle_ 來作為引擎的一部分來啟動的。
本文會先介紹一些基礎支撐類,然後介紹 Server / Worker的基類 SimpleApp,最後介紹 Server / Worker 的具體實現。
總體流程圖提前劇透如下:
0x01 基礎類
我們首先需要介紹一些基礎類。
1.1 Range
Range 類作用是:根據這個Range確定要拉取的引數在哪個server上,以及一個server對應的key的range。
Range 類提供如下函式:
- begin()和end()兩個uint64的位置;
- size() 獲得 本 range 的大小,即 end_ - begin_;
class Range {
public:
Range() : Range(0, 0) {}
Range(uint64_t begin, uint64_t end) : begin_(begin), end_(end) { }
uint64_t begin() const { return begin_; }
uint64_t end() const { return end_; }
uint64_t size() const { return end_ - begin_; }
private:
uint64_t begin_;
uint64_t end_;
};
1.2 TreadsafeQueue
TreadsafeQueue 是一個可以供多個執行緒讀取的佇列,通過鎖和條件量合作來達到執行緒安全,用來做訊息佇列。
/**
* \brief thread-safe queue allowing push and waited pop
*/
class ThreadsafePQueue {
public:
ThreadsafePQueue() { }
~ThreadsafePQueue() { }
/**
* \brief push an value into the end. threadsafe.
* \param new_value the value
*/
void Push(Message new_value) {
mu_.lock();
queue_.push(std::move(new_value));
mu_.unlock();
cond_.notify_all();
}
/**
* \brief wait until pop an element from the beginning, threadsafe
* \param value the poped value
*/
void WaitAndPop(Message* value) { // 等待佇列不為空,按照優先順序pop message
std::unique_lock<std::mutex> lk(mu_);
cond_.wait(lk, [this]{return !queue_.empty();});
*value = std::move(queue_.top());
queue_.pop();
}
private:
class Compare {
public:
bool operator()(const Message &l, const Message &r) {
return l.meta.priority <= r.meta.priority;
}
};
mutable std::mutex mu_; //資料同步互斥變數
std::priority_queue<Message, std::vector<Message>, Compare> queue_; // message優先佇列
std::condition_variable cond_; //佇列不為空條件變數
};
0x02 SimpleApp
2.1 概述
SimpleApp是一個基類,把應用節點功能做了一個統一抽象。
- 提供了基本傳送功能和簡單訊息處理函式(Request, Wait, Response)。
- 訊息型別為:int型的head和string型的body。
- 它有2個派生類。KVServer和KVWorker。
2.2 定義
2.2.1 支撐類
SimpleData 定義了 Request 和 Response 的基本格式。
struct SimpleData {
/** \brief the int head */
int head;
/** \brief the string body */
std::string body;
/** \brief sender's node id */
int sender;
/** \brief the associated timestamp */
int timestamp;
/** \brief sender's customer id */
int customer_id;
};
2.2.2 成員變數
SimpleApp 主要有如下成員變數:
- Customer* obj_ :本 App 的 Customer,控制請求連線;
- Handle request_handle_ :request 處理函式;
- Handle response_handle_ :response 處理函式;
- set_request_handle,set_response_handle:設定成員
request_handle_
,response_handle_
。在客戶端呼叫SimpleApp::Process時,根據message.meta中的指示變數判斷是request還是response,呼叫相應handle處理;
class SimpleApp {
public:
/**
* \brief constructor
* @param app_id the app id, should match with the remote node app with which this app
* @param customer_id the customer_id, should be node-locally unique
* is communicated
*/
explicit SimpleApp(int app_id, int customer_id);
/** \brief deconstructor */
virtual ~SimpleApp() { delete obj_; obj_ = nullptr; }
/**
* \brief send a request to a remote node
*
* \param req_head request head
* \param req_body request body
* \param recv_id remote node id
*
* @return the timestamp of this request
*/
virtual inline int Request(int req_head, const std::string& req_body, int recv_id);
/**
* \brief wait until a request is finished
*
* \param timestamp
*/
virtual inline void Wait(int timestamp) { obj_->WaitRequest(timestamp); }
/**
* \brief send back a response for a request
* \param recv_req the received request
* \param the response body
*/
virtual inline void Response(const SimpleData& recv_req, const std::string& res_body = "");
/**
* \brief the handle to proces a received request/respoonse
*
* \param recved the received request or response
* \param app this pointer
*/
using Handle = std::function<void(const SimpleData& recved, SimpleApp* app)>;
/**
* \brief set the request handle
* \param request_handle the request handle
*/
virtual inline void set_request_handle(const Handle& request_handle) {
CHECK(request_handle) << "invalid request handle";
request_handle_ = request_handle;
}
/**
* \brief set the response handle
* \param response_handle the response handle
*/
virtual inline void set_response_handle(const Handle& response_handle) {
CHECK(response_handle) << "invalid response handle";
response_handle_ = response_handle;
}
/**
* \brief returns the customer
*/
virtual inline Customer* get_customer() { return obj_; }
protected:
/** \brief empty construct */
inline SimpleApp() : obj_(nullptr) {
request_handle_ = [](const SimpleData& recved, SimpleApp* app) {
app->Response(recved);
};
response_handle_ = [](const SimpleData& recved, SimpleApp* app) { };
}
/** \brief process a received message */
virtual inline void Process(const Message& msg);
/** \brief ps internal object */
Customer* obj_;
private:
/** \brief request handle */
Handle request_handle_;
/** \brief request handle */
Handle response_handle_;
};
2.3 功能函式
三個簡單功能函式如下:
Request 就是呼叫 Van 傳送訊息。
inline int SimpleApp::Request(int req_head, const std::string& req_body, int recv_id) {
// setup message
Message msg;
msg.meta.head = req_head;
if (req_body.size()) msg.meta.body = req_body;
int ts = obj_->NewRequest(recv_id);
msg.meta.timestamp = ts;
msg.meta.request = true;
msg.meta.simple_app = true;
msg.meta.app_id = obj_->app_id();
msg.meta.customer_id = obj_->customer_id();
// send
for (int r : Postoffice::Get()->GetNodeIDs(recv_id)) {
msg.meta.recver = r;
Postoffice::Get()->van()->Send(msg);
}
return ts;
}
Response 是呼叫 Van 回覆訊息。
inline void SimpleApp::Response(const SimpleData& req, const std::string& res_body) {
// setup message
Message msg;
msg.meta.head = req.head;
if (res_body.size()) msg.meta.body = res_body;
msg.meta.timestamp = req.timestamp;
msg.meta.request = false;
msg.meta.simple_app = true;
msg.meta.app_id = obj_->app_id();
msg.meta.customer_id = req.customer_id;
msg.meta.recver = req.sender;
// send
Postoffice::Get()->van()->Send(msg);
}
Process 函式根據message.meta中的指示變數判斷是request還是response,呼叫相應handle處理。
inline void SimpleApp::Process(const Message& msg) {
SimpleData recv;
recv.sender = msg.meta.sender;
recv.head = msg.meta.head;
recv.body = msg.meta.body;
recv.timestamp = msg.meta.timestamp;
recv.customer_id = msg.meta.customer_id;
if (msg.meta.request) { // 判斷是request還是response,呼叫相應handle處理
CHECK(request_handle_);
request_handle_(recv, this);
} else {
CHECK(response_handle_);
response_handle_(recv, this);
}
}
0x03 KVServer
KVServer 是 Server 節點的抽象,其作用是 接收資訊、處理資訊、返回結果三個步驟,主要功能是:
- 維護 key-value pairs 資料;
- 處理 & 應答 客戶端的 push & pull 請求;
- 函式
request_handle_
處理請求:- 在呼叫KVServer::Process時 會呼叫到
request_handle_
。 request_handle_
預設為KVServerDefaultHandle
。
- 在呼叫KVServer::Process時 會呼叫到
- 函式
Response
用於返回資料;
- 函式
3.1 定義
request_handle_ 是 request 處理函式,需要自定義。
- 在該回調函式中使用者則需要實現各種優化器的的模型權重梯度更新演算法和模型權重返回操作。
- 可直接參考ps-lite已實現的預設版本KVServerDefaultHandle。
/**
* \brief A server node for maintaining key-value pairs
*/
template <typename Val>
class KVServer : public SimpleApp {
public:
/**
* \brief constructor
* \param app_id the app id, should match with \ref KVWorker's id
*/
explicit KVServer(int app_id) : SimpleApp() {
using namespace std::placeholders;
obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
}
/** \brief deconstructor */
virtual ~KVServer() { delete obj_; obj_ = nullptr; }
/**
* \brief the handle to process a push/pull request from a worker
* \param req_meta meta-info of this request
* \param req_data kv pairs of this request
* \param server this pointer
*/
using ReqHandle = std::function<void(const KVMeta& req_meta,
const KVPairs<Val>& req_data,
KVServer* server)>;
void set_request_handle(const ReqHandle& request_handle) {
CHECK(request_handle) << "invalid request handle";
request_handle_ = request_handle;
}
/**
* \brief response to the push/pull request
* \param req the meta-info of the request
* \param res the kv pairs that will send back to the worker
*/
void Response(const KVMeta& req, const KVPairs<Val>& res = KVPairs<Val>());
private:
/** \brief internal receive handle */
void Process(const Message& msg);
/** \brief request handle */
ReqHandle request_handle_; // 需要使用者自己實現
};
3.2 功能函式
3.2.1 Response
Response()
就是向呼叫的worker傳送 response 資訊。與SimpleApp 比較下,發現 KVServer 這裡對於 head 和 body 都有了新的處理。
需要注意的是:Response 函式應該是被使用者自定義的 request_handle_
呼叫,即 request_handle_
處理收到的訊息,然後呼叫 Response 對 worker 進行回覆應答。
template <typename Val>
void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) {
Message msg;
msg.meta.app_id = obj_->app_id();
msg.meta.customer_id = req.customer_id;
msg.meta.request = false;
msg.meta.push = req.push;
msg.meta.pull = req.pull;
msg.meta.head = req.cmd;
msg.meta.timestamp = req.timestamp;
msg.meta.recver = req.sender;
if (res.keys.size()) {
msg.AddData(res.keys);
msg.AddData(res.vals);
if (res.lens.size()) {
msg.AddData(res.lens);
}
}
Postoffice::Get()->van()->Send(msg);
}
3.2.2 Process
Process()
被註冊到Customer物件中,當Customer物件的receiving thread接受到訊息時,就呼叫Process()
對資料進行處理。
Process()
內部的邏輯是:
- 提取訊息的元資訊,構建一個 KVMeta。
- 可以看到,在 Process 中沒有對 KV 資料的維護。
- Process 呼叫 使用者自行實現的一個request_handle_ (std::function函式物件)對資料進行處理。
- 在回撥函式 request_handle_ 中使用者則需要實現各種優化器的的模型權重梯度更新演算法和模型權重返回操作。
template <typename Val>
void KVServer<Val>::Process(const Message& msg) {
if (msg.meta.simple_app) {
SimpleApp::Process(msg); return;
}
KVMeta meta;
meta.cmd = msg.meta.head;
meta.push = msg.meta.push;
meta.pull = msg.meta.pull;
meta.sender = msg.meta.sender;
meta.timestamp = msg.meta.timestamp;
meta.customer_id = msg.meta.customer_id;
KVPairs<Val> data;
int n = msg.data.size();
if (n) {
CHECK_GE(n, 2);
data.keys = msg.data[0];
data.vals = msg.data[1];
if (n > 2) {
CHECK_EQ(n, 3);
data.lens = msg.data[2];
CHECK_EQ(data.lens.size(), data.keys.size());
}
}
CHECK(request_handle_);
request_handle_(meta, data, this);
}
3.2.3 例子函式
KVServerDefaultHandle 是 ps-lite 提供的例子,用於演示如何維護 KV,處理訊息,返回請求。
這裡維護一個雜湊表 unordered_map,記錄key和value,並對push和pull請求進行響應。
使用std::unordered_map store儲存server的引數,當請求為push時,對store引數做更新,請求為pull時對引數進行拉取;
/**
* \brief an example handle adding pushed kv into store
*/
template <typename Val>
struct KVServerDefaultHandle {
void operator()(
const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) {
size_t n = req_data.keys.size();
KVPairs<Val> res;
if (!req_meta.pull) {
CHECK_EQ(n, req_data.vals.size());
} else {
res.keys = req_data.keys; res.vals.resize(n);
}
for (size_t i = 0; i < n; ++i) {
Key key = req_data.keys[i];
if (req_meta.push) {
store[key] += req_data.vals[i];
}
if (req_meta.pull) {
res.vals[i] = store[key];
}
}
server->Response(req_meta, res);
}
std::unordered_map<Key, Val> store;
};
3.2.4 流程
我們接著上文繼續梳理細化流程。
-
worker節點 或者 server節點 在程式的最開始會執行
Postoffice::start()
。 -
Postoffice::start()
會初始化節點資訊,並且呼叫Van::start()
。 -
每個節點都監聽了本地一個埠;該連線的節點在啟動時已經連線。
-
Van::start()
啟動一個本地執行緒專門接收socket的資訊,使用Van::Receiving()
來持續監聽收到的message。-
receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
-
-
Van::Receiving()
接收後訊息之後,根據不同命令執行不同動作。針對資料訊息,如果需要下一步處理,會呼叫 ProcessDataMsg:- 依據訊息中的app id找到 Customer(每個app 任務會繫結一個custom類),即會根據customer id的不同將message發給不同的customer的recv thread。
- 將訊息傳遞給
Customer::Accept
函式。
-
Customer::Accept() 函式將訊息新增到一個佇列
recv_queue_
; -
Customer 物件本身也會啟動一個接受執行緒
recv_thread_
,使用 Customer::Receiving() :- 不斷的從
recv_queue_
佇列取訊息。 - 如果 (!recv.meta.request) ,就說明是 response,則
tracker_[req.timestamp].second++
- 呼叫註冊的使用者自定義的
recv_handle_
函式對訊息進行處理。
- 不斷的從
-
對於worker來說,其註冊的
recv_handle_
是KVWorker::Process()
函式。因為worker的recv thread接受到的訊息主要是從server處pull下來的KV對,因此該Process()
主要是接收message中的KV對; -
而對於Server來說,其註冊的
recv_handle_
是KVServer::Process()
函式。 -
因為我們這裡是 KVServer,而且server接受的是worker們push上來的KV對,需要對其進行處理,因此該
Process()
函式中呼叫的使用者通過KVServer::set_request_handle()
傳入的函式物件。 -
在 使用者自定義的 request_handle_ 函式中,如果需要傳送 response 給 worker,則呼叫 KVServer
::Response。
目前邏輯如下圖,在 第 8 步,recv_handle_ 指向 KVServer
+--------------------------+
| Van |
| |
Request +-----------> Receiving |
| 1 + | +---------------------------+
| | | | Postoffice |
| | 2 | | |
| v | GetCustomer | |
| ProcessDataMsg <------------------> unordered_map customers_|
| + | 3 | |
| | | +---------------------------+
+--------------------------+
|
| 4
|
+------------------------------------+
| Customer | |
| | |
| v |
| Accept |
| + |
| | |
| | 5 |
| v |
| recv_queue_ | +------------------+
| + | |KVWorker |
| | 6 | +--------> | |
| | | | 8 | Process |
| v | | +------------------+
| recv_thread_ +---> Receiving | |
| + | |
| | 7 | |
| | | | +------------------+
| v | | |KVServer |
| recv_handle_+---------+--------> | |
| | 8 | Process |
+------------------------------------+ | + |
+------------------+
|
| 9
v
+-----------+-------+
| request_handle_ |
10 | |
Response <----------------------------------------------------+ Response |
| |
+-------------------+
0x04 KVWorker
4.1 概述
KVWorker用於向server節點push,pull key-value對,就是在演算法過程中,需要並行處理的各種引數。
- Worker中的push和pull操作都是非同步返回一個ID,然後使用ID進行wait阻塞等待,即同步操作。
- 或者非同步呼叫時傳入一個Callback進行後續操作。
4.2 定義
KVWorker 主要變數為:
- std::unordered_map<int, std::vector<KVPairs
>> recv_kvs :收到的pull 結果: kv value ; - std::unordered_map<int, Callback> callbacks :收到 request 的所有 response 之後執行的回撥函式;
- Slicer slicer_ :預設 slice 函式變數,該函式在呼叫Send函式時,將KVPairs按照每個server的Range切片;
主要函式為:
-
ZPush 零拷貝push函式
-
ZPull 零拷貝pull函式
-
AddPullCB key重組函式
-
Process 訊息處理函式
-
DefaultSlicer 預設的slice 處理函式
-
set_slicer:設定slicer_成員,該函式在呼叫Send函式時,將KVPairs按照每個server的Range切片;
/**
* \brief A worker node that can \ref Push (\ref Pull) key-value pairs to (from) server
* nodes
*
* \tparam Val the type of value, which should be primitive types such as
* int32_t and float
*/
template<typename Val>
class KVWorker : public SimpleApp {
public:
/** avoid too many this-> */
using SimpleApp::obj_; // Customer 物件
/**
* \brief callback function for \ref Push and \ref Pull
*
* It is called by the data receiving thread of this instance when the push or
* pull is actually finished. Namely the kv pairs have already written into
* servers' data structure or the kv pairs have already pulled back.
*/
using Callback = std::function<void()>;
/**
* \brief constructor
*
* \param app_id the app id, should match with \ref KVServer's id
* \param customer_id the customer id which is unique locally
*/
explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
using namespace std::placeholders;
slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
}
/** \brief deconstructor */
virtual ~KVWorker() { delete obj_; obj_ = nullptr; }
using SlicedKVs = std::vector<std::pair<bool, KVPairs<Val>>>;
/**
* \brief a slicer partitions a key-value list according to the key ranges
* \param send the kv list for partitioning
* \param ranges the key ranges, ranges[i] is the key range of server i
* \param sliced the sliced lists. slices[i] should only contains keys in
* ranges[i] and the according values
*/
using Slicer = std::function<void(
const KVPairs<Val>& send, const std::vector<Range>& ranges,
SlicedKVs* sliced)>;
/**
* \brief set a user-defined slicer
*/
void set_slicer(const Slicer& slicer) {
CHECK(slicer); slicer_ = slicer;
}
private:
/**
* \brief add a callback for a request. threadsafe.
* @param cb callback
* @param timestamp the timestamp of the request
*/
void AddCallback(int timestamp, const Callback& cb) {
if (!cb) return;
std::lock_guard<std::mutex> lk(mu_);
callbacks_[timestamp] = cb;
}
/** \brief data buffer for received kvs for each timestamp */
std::unordered_map<int, std::vector<KVPairs<Val>>> recv_kvs_; // 收到的 kv value
/** \brief callbacks for each timestamp */
std::unordered_map<int, Callback> callbacks_; // 收到 request 的所有 response 之後執行的回撥函式
/** \brief lock */
std::mutex mu_;
/** \brief kv list slicer */
Slicer slicer_; // 預設 slice 函式變數
};
4.3 功能函式
4.3.1 Push & ZPush
因為 Push 呼叫了 ZPush,所以我們放在一起介紹。
Push方法主要就是:
- 把資料(KV列表)傳送到對應的伺服器節點;
- KV列表是依據每個伺服器維護的 Key range 來進行分割槽傳送;
- Push 是非同步直接返回,如果想知道返回結果如何,則可以:
- 使用 Wait 來等待,即利用tracker_來記錄傳送的請求量和對應的響應請求量,當傳送量等於接收量的時候,表示每個請求都成功傳送了,以此來達到同步的目的;
- 使用 callback,這樣當結束時候就可以回撥到。
ZPush 方法是:
- 使用obj_(Customer型別)的 NewRequest 方法來記錄記錄傳送的請求量和對應的響應請求量,並且返回一個時間戳;
- 設定好對應 timestamp 的 callback;
- 使用傳入的引數構造KVPair物件,呼叫Send送出該物件;
int Push(const std::vector<Key>& keys,
const std::vector<Val>& vals,
const std::vector<int>& lens = {},
int cmd = 0,
const Callback& cb = nullptr,
int priority = 0) {
return ZPush(
SArray<Key>(keys), SArray<Val>(vals), SArray<int>(lens), cmd, cb,
priority);
}
int ZPush(const SArray<Key>& keys,
const SArray<Val>& vals,
const SArray<int>& lens = {},
int cmd = 0,
const Callback& cb = nullptr,
int priority = 0) {
int ts = obj_->NewRequest(kServerGroup);
AddCallback(ts, cb);
KVPairs<Val> kvs;
kvs.keys = keys;
kvs.vals = vals;
kvs.lens = lens;
kvs.priority = priority;
Send(ts, true, false, cmd, kvs);
return ts;
}
如何呼叫可以參考其註釋:
* Sample usage: the following codes push two KV pairs `{1, (1.1, 1.2)}` and `{3,
* (3.1,3.2)}` to server nodes, where the value is a length-2 float vector
* \code
* KVWorker<float> w;
* std::vector<Key> keys = {1, 3};
* std::vector<float> vals = {1.1, 1.2, 3.1, 3.2};
* w.Push(keys, vals);
* \endcode
4.3.2 Pull
pull方法跟push的邏輯大體類似:
- 繫結一個回撥函式,用於拷貝資料,並且得到一個時間戳。
- 根據key_vector從Server上拉取val_vector,
- 最終返回timestamp,
- 該函式不阻塞,可用worker.Wait(timestamp)等待;
int Pull(const std::vector<Key>& keys,
std::vector<Val>* vals,
std::vector<int>* lens = nullptr,
int cmd = 0,
const Callback& cb = nullptr,
int priority = 0) {
SArray<Key> skeys(keys);
int ts = AddPullCB(skeys, vals, lens, cmd, cb);
KVPairs<Val> kvs;
kvs.keys = skeys;
kvs.priority = priority;
Send(ts, false, true, cmd, kvs);
return ts;
}
4.3.3 ZPull
邏輯與 Pull 一致,只是省略了拷貝到系統這個過程。因此需要保證在ZPull完成前,呼叫者沒有改變key_vector;
int ZPull(const SArray<Key>& keys,
SArray<Val>* vals,
SArray<int>* lens = nullptr,
int cmd = 0,
const Callback& cb = nullptr,
int priority = 0) {
int ts = AddPullCB(keys, vals, lens, cmd, cb);
KVPairs<Val> kvs;
kvs.keys = keys;
kvs.priority = priority;
Send(ts, false, true, cmd, kvs);
return ts;
}
4.3.4 Send
Push()
和Pull()
最後都會呼叫Send()
函式,Send()
對KVPairs進行切分,因為每個Server只保留一部分引數,因此切分後的SlicedKVpairs就會被髮送給不同的Server。
如果是 skipped,則會直接呼叫 callback。
否則遍歷傳送。
template <typename Val>
void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) {
// slice the message
SlicedKVs sliced;
slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced);
// need to add response first, since it will not always trigger the callback
int skipped = 0;
for (size_t i = 0; i < sliced.size(); ++i) {
if (!sliced[i].first) ++skipped;
}
obj_->AddResponse(timestamp, skipped);
if ((size_t)skipped == sliced.size()) {
RunCallback(timestamp);
}
for (size_t i = 0; i < sliced.size(); ++i) {
const auto& s = sliced[i];
if (!s.first) continue;
Message msg;
msg.meta.app_id = obj_->app_id();
msg.meta.customer_id = obj_->customer_id();
msg.meta.request = true;
msg.meta.push = push;
msg.meta.pull = pull;
msg.meta.head = cmd;
msg.meta.timestamp = timestamp;
msg.meta.recver = Postoffice::Get()->ServerRankToID(i);
msg.meta.priority = kvs.priority;
const auto& kvs = s.second;
if (kvs.keys.size()) {
msg.AddData(kvs.keys);
msg.AddData(kvs.vals);
if (kvs.lens.size()) {
msg.AddData(kvs.lens);
}
}
Postoffice::Get()->van()->Send(msg);
}
}
4.3.5 DefaultSlicer
切分函式可以由使用者自行重寫,預設為DefaultSlicer
,每個SlicedKVPairs被包裝成Message物件,然後用van::send()
傳送。
根據std::vector& ranges
分片範圍資訊,將要傳送的資料進行分片。目前預設的使用 Postoffice::GetServerKeyRanges
來劃分分片範圍。
template <typename Val>
void KVWorker<Val>::DefaultSlicer(
const KVPairs<Val>& send, const std::vector<Range>& ranges,
typename KVWorker<Val>::SlicedKVs* sliced) {
sliced->resize(ranges.size());
// find the positions in msg.key
size_t n = ranges.size();
std::vector<size_t> pos(n+1);
const Key* begin = send.keys.begin();
const Key* end = send.keys.end();
for (size_t i = 0; i < n; ++i) {
if (i == 0) {
pos[0] = std::lower_bound(begin, end, ranges[0].begin()) - begin;
begin += pos[0];
} else {
CHECK_EQ(ranges[i-1].end(), ranges[i].begin());
}
size_t len = std::lower_bound(begin, end, ranges[i].end()) - begin;
begin += len;
pos[i+1] = pos[i] + len;
// don't send it to servers for empty kv
sliced->at(i).first = (len != 0);
}
CHECK_EQ(pos[n], send.keys.size());
if (send.keys.empty()) return;
// the length of value
size_t k = 0, val_begin = 0, val_end = 0;
if (send.lens.empty()) {
k = send.vals.size() / send.keys.size();
CHECK_EQ(k * send.keys.size(), send.vals.size());
} else {
CHECK_EQ(send.keys.size(), send.lens.size());
}
// slice
for (size_t i = 0; i < n; ++i) {
if (pos[i+1] == pos[i]) {
sliced->at(i).first = false;
continue;
}
sliced->at(i).first = true;
auto& kv = sliced->at(i).second;
kv.keys = send.keys.segment(pos[i], pos[i+1]);
if (send.lens.size()) {
kv.lens = send.lens.segment(pos[i], pos[i+1]);
for (int l : kv.lens) val_end += l;
kv.vals = send.vals.segment(val_begin, val_end);
val_begin = val_end;
} else {
kv.vals = send.vals.segment(pos[i]*k, pos[i+1]*k);
}
}
}
4.3.6 PushPull & ZPushPull
就是把 push,pull 聚合在一起。
int PushPull(const std::vector<Key>& keys,
const std::vector<Val>& vals,
std::vector<Val>* outs,
std::vector<int>* lens = nullptr,
int cmd = 0,
const Callback& cb = nullptr,
int priority = 0) {
CHECK_NOTNULL(outs);
if (outs->empty())
outs->resize(vals.size());
else
CHECK_EQ(vals.size(), outs->size());
SArray<Key> skeys(keys);
SArray<Val> svals(vals);
auto souts = new SArray<Val>(outs->data(), outs->size());
SArray<int>* slens = lens ?
new SArray<int>(lens->data(), lens->size()) : nullptr;
int ts = ZPushPull(skeys, svals, souts, slens, cmd,
[this, cb, souts, slens]() {
delete souts;
delete slens;
if (cb) cb();
}, priority);
return ts;
}
int ZPushPull(const SArray<Key>& keys,
const SArray<Val>& vals,
SArray<Val>* outs,
SArray<int>* lens = nullptr,
int cmd = 0,
const Callback& cb = nullptr,
int priority = 0) {
int ts = AddPullCB(keys, outs, lens, cmd, cb);
KVPairs<Val> kvs;
kvs.keys = keys;
kvs.vals = vals;
kvs.priority = priority;
if (lens)
kvs.lens = *lens;
Send(ts, true, true, cmd, kvs);
re
4.3.7 Callback 相關
前面提到了一些回撥函式的設定,下面我們看看如何使用。
4.3.7.1 設定
我們可以看到,針對每個時間戳,設定了一個回撥函式,進而構成了一個回撥函式列表。
每次傳送請求之後,都會往這個列表中註冊回撥函式。
using Callback = std::function<void()>;
/** \brief callbacks for each timestamp */
std::unordered_map<int, Callback> callbacks_; // 回撥函式列表
void AddCallback(int timestamp, const Callback& cb) {
if (!cb) return;
std::lock_guard<std::mutex> lk(mu_);
callbacks_[timestamp] = cb; // 添加回調函式
}
4.3.7.2 AddPullCB
這是 pull 之後,得到應答的回撥函式,用於拷貝返回的資料。
但是,如果是多個 Server 都應該有返回,應該如何處理?無論是 push 還是 pull,只有在收到了所有的Response之後,才會將從各個server上拉取的value填入本地的vals
裡。
template <typename Val>
template <typename C, typename D>
int KVWorker<Val>::AddPullCB(
const SArray<Key>& keys, C* vals, D* lens, int cmd,
const Callback& cb) {
int ts = obj_->NewRequest(kServerGroup);
AddCallback(ts, [this, ts, keys, vals, lens, cb]() mutable {
mu_.lock();
auto& kvs = recv_kvs_[ts];
mu_.unlock();
// do check
size_t total_key = 0, total_val = 0;
for (const auto& s : kvs) { // 進行有效性驗證
Range range = FindRange(keys, s.keys.front(), s.keys.back()+1);
CHECK_EQ(range.size(), s.keys.size())
<< "unmatched keys size from one server";
if (lens) CHECK_EQ(s.lens.size(), s.keys.size());
total_key += s.keys.size();
total_val += s.vals.size();
}
CHECK_EQ(total_key, keys.size()) << "lost some servers?";
// fill vals and lens
std::sort(kvs.begin(), kvs.end(), [](
const KVPairs<Val>& a, const KVPairs<Val>& b) {
return a.keys.front() < b.keys.front();
});
CHECK_NOTNULL(vals);
if (vals->empty()) {
vals->resize(total_val);
} else {
CHECK_EQ(vals->size(), total_val);
}
Val* p_vals = vals->data();
int *p_lens = nullptr;
if (lens) {
if (lens->empty()) {
lens->resize(keys.size());
} else {
CHECK_EQ(lens->size(), keys.size());
}
p_lens = lens->data();
}
for (const auto& s : kvs) { // 拷貝返回的資料
memcpy(p_vals, s.vals.data(), s.vals.size() * sizeof(Val));
p_vals += s.vals.size();
if (p_lens) {
memcpy(p_lens, s.lens.data(), s.lens.size() * sizeof(int));
p_lens += s.lens.size();
}
}
mu_.lock();
recv_kvs_.erase(ts);
mu_.unlock();
if (cb) cb();
});
return ts;
}
4.3.7.3 執行
就是依據時間戳找到回撥函式,執行,然後刪除。
何時呼叫,就是在 Process 之中會呼叫,我們馬上介紹。
template <typename Val>
void KVWorker<Val>::RunCallback(int timestamp) {
mu_.lock();
auto it = callbacks_.find(timestamp);
if (it != callbacks_.end()) {
mu_.unlock();
CHECK(it->second);
it->second();
mu_.lock();
callbacks_.erase(it);
}
mu_.unlock();
}
4.3.8 Process
如果是 Pull 的 response, 在每次收到的Response返回的values,會先儲存recv_kvs_
裡,recv_kvs_[ts].push_back(kvs);
無論是 push 還是 pull,只有在收到了所有的Response之後,才會將從各個server上拉取的value填入本地的vals
裡。
template <typename Val>
void KVWorker<Val>::Process(const Message& msg) {
if (msg.meta.simple_app) {
SimpleApp::Process(msg); return;
}
// store the data for pulling
int ts = msg.meta.timestamp;
if (msg.meta.pull) {
CHECK_GE(msg.data.size(), (size_t)2);
KVPairs<Val> kvs;
kvs.keys = msg.data[0];
kvs.vals = msg.data[1];
if (msg.data.size() > (size_t)2) {
kvs.lens = msg.data[2];
}
mu_.lock();
recv_kvs_[ts].push_back(kvs);
mu_.unlock();
}
// finished, run callbacks,只有在收到了所有的Response之後
if (obj_->NumResponse(ts) == Postoffice::Get()->num_servers() - 1) {
RunCallback(ts); // 在這裡呼叫了 RunCallback。
}
}
0x05 總結
最後我們用一個訊息傳遞流程做一下總結,看看各個部分在其中如何使用。總體流程圖如下:
- worker節點 要傳送訊息,所以呼叫了 Send 方法。
- Send 方法會呼叫到了 Customer的 NewRequest,來建立一個新請求。
Postoffice::start()
會初始化節點資訊,並且呼叫Van::start()
。- Send方法會呼叫 Van 的 send 方法來進行網路互動。
- 經過網路傳遞之後,流程來到了 Server 處,對於 Server 來說,這是一個 Request,呼叫到了 Van 的 Receiving。
Van::Receiving()
接收後訊息之後,根據不同命令執行不同動作。針對資料訊息,如果需要下一步處理,會呼叫 ProcessDataMsg。 - 繼續呼叫到 Van 的 ProcessDataMsg,然後呼叫 GetCustomer。
- GetCustomer 會呼叫到Postoffice,對於 customers_ 進行相應處理。
- Customer 會使用 Accept 來處理訊息。
- Customer::Accept() 函式將訊息新增到一個佇列
recv_queue_
。 - Customer 物件本身也會啟動一個接受執行緒
recv_thread_
,使用 Customer::Receiving() :- 不斷的從
recv_queue_
佇列取訊息。 - 如果 (!recv.meta.request) ,就說明是 response,則
tracker_[req.timestamp].second++
- 呼叫註冊的使用者自定義的
recv_handle_
函式對訊息進行處理。
- 不斷的從
Van::Receiving()
呼叫註冊的使用者自定義的recv_handle_
函式對訊息進行處理。- 對於Server來說,其註冊的
recv_handle_
是KVServer::Process()
函式。 - Process 函式呼叫 request_handle_ 繼續處理,生成 Response,返回給 Worker。
- Response 經過網路傳遞給 Worker。
- 執行回到了 Worker,來到了 Worker 的 Van。對於 worker 來說,這是一個 Request,呼叫到了 Van 的 Receiving。(以下操作序列和 Server 類似)
Van::Receiving()
接收後訊息之後,根據不同命令執行不同動作。針對資料訊息,如果需要下一步處理,會呼叫 ProcessDataMsg。- Customer 會使用 Accept 來處理訊息。
- Customer::Accept() 函式將訊息新增到一個佇列
recv_queue_
。 - 這裡有個解耦合,由一個新執行緒
recv_thread_
處理。 - Customer 物件本身已經啟動一個新執行緒
recv_thread_
,使用 Customer::Receiving() 。 - 對於Worker來說,其註冊的
recv_handle_
是KVWorker::Process()
函式。 - 呼叫到
KVWorker::Process()
函式處理響應訊息Response。
+---------------------+ +------------------------+ Worker + Server +--------------------------+
| KVWorker | 1 | Van | 3 | | Van |
| Send +--------+---------------> send +-----------------+-----> Request +-----------> Receiving |
| | | | | | + |
| | | | Receiving <---------+ | 4 | | | +---------------------------+
| | | | + | | | | | | | Postoffice |
| Process | | | | 16 | | | | | 5 | | |
| ^ | | | v | | 15 | | v | GetCustomer | |
| | | | | ProcessDataMsg | | | | ProcessDataMsg <------------------> unordered_map customers_|
| | | | | + | | | | + | 6 | |
| | | | | | | | | | | | +---------------------------+
+---------------------+ | +------------------------+ | | +--------------------------+
| | | | | |
| |2 | 17 | | | 7
| | | | | |
| +---------------------------------------+ | | +------------------------------------+
| | Customer | | | | | | Customer | |
| | | v | | | | | |
| | v | | | | v |
| | NewRequest Accept | | | | Accept |
| | + | | | | + |
| | | 18 | | | | | |
| | | | | | | | 8 |
| | v | | | | v |
| | revc_queue_ | | | | recv_queue_ |
| | + | | | | + |
22 | | | 19 | | | | | 9 |
| | | | | | | | |
| | 20 v | | | | 10 v |
| | recv_thread_ +-------> Receving | | | | recv_thread_ +---> Receiving |
| | | | | | | + |
| | | 21 | | | | | 11 |
| | | | | | | | | +------------------+
| | v | | | | v | |KVServer |
+---------------------------+ recv_handle | | | | recv_handle_+------------------> | |
| | | | | | 12 | Process |
+---------------------------------------+ | | +------------------------------------+ | + |
| | +------------------+
| | |
| | | 13
| | v
| | +-----------+-------+
| | | request_handle_ |
| | 14 | |
+<-----------+ Response <----------------------------------------------------+ Response |
| | |
| +-------------------+
+
手機如下:
0xEE 個人資訊
★★★★★★關於生活和技術的思考★★★★★★
微信公眾賬號:羅西的思考
如果您想及時得到個人撰寫文章的訊息推送,或者想看看個人推薦的技術資料,敬請關注。