Ceph網路模組使用案例:OSD心跳檢測機制
ceph是一個分散式的檔案系統。對於一個分散式系統,需要一個穩定的底層網路通訊模組(訊息模組),用於元件之間的互聯互通。
ceph的元件主要包括,OSD,monitor,mgr,osdclient,client等,這些模組內部的通訊,以及模組間的通訊都使用了,後面我們直接把這些元件稱為網路模組的使用者。使用者在使用網路模組的時候,主要涉及到2個角色,一個是messenger,一個是dispatcher。messenger相當於一個訊息管理器(網路模組的核心,訊息的傳送和接收都是messenger通過底層的類實現),dispatcher相當於一個訊息的處理器。
-messenger
- ① 將dispatcher移交給它的資訊傳送給其它節點(節點,這裡節點是一個邏輯單元,比如osd.0就算是一個節點,osd.1算一個新的節點)處理
- ② 從其它節點的messenger獲取訊息移交給本節點的dispatcher。
-dispatcher
- ① 對接收到的訊息(有messenger移交給它)進行處理
- ② 把需要傳送的訊息移交給本節點的messenger
每個ceph元件都會註冊多個messenger和多個dispatcher用於處理不同型別的訊息。以OSD元件為例,OSD的守護程序啟動的時候會註冊7個messenger來管理訊息,每個messenger的用途不一樣。這部分程式碼在ceph osd守護程序啟動中 /src/ceph_osd.cc。 每一個OSD都有一個守護程序(OSD deamon)。這個deamon負責完成OSD的所有邏輯功能,包括與monitor和其他OSD(事實上是其他OSD的deamon)通訊以維護更新系 統狀態,與其他OSD共同完成資料的儲存和維護,與client通訊完成各種資料物件操作等等。
例子:一個OSD模組會註冊7個messenger和2個dispatcher。
編號 |
Messenger例項名稱 |
作用 |
1 |
*ms_public |
用來處理OSD和Client之間的訊息 |
2 |
*ms_cluster |
用來處理OSD和叢集之間的訊息 |
3 |
*ms_hb_front_client |
用來向其它OSD傳送心跳的訊息 |
4 |
*ms_hb_back_client |
用來向其它OSD傳送心跳的訊息 |
5 |
*ms_hb_back_server |
用來接收其他OSD的心跳訊息 |
6 |
*ms_hb_front_server |
用來接收其他OSD的心跳訊息 |
7 |
*ms_objecter |
用來處理OSD和Objecter之間的訊息 |
編號 |
Dispatcher例項名稱 |
作用 |
1 |
*OSD |
可以處理部分osd節點的訊息 |
2 |
*heartbeat_dispatcher |
處理心跳連線 |
1 訊息模組的使用框架
1.1 傳送訊息
本節描述ceph中的應用(osd、mgr、monitor等)是如何使用網路模組傳送訊息的。在實際的應用中,有兩種常見的傳送訊息的方式,當然這兩種方式只是看起來有些不同,的底層實現都是相同,都是呼叫AsyncConnection::send_message(Message *m)把訊息傳送出去。
1)方式一:使用AsyncMessenger::send_message(Message *m, const entity_inst_t& dest))
其中Message *m是要傳送的訊息,dest是目的地址。
send_message會首先去判斷自己和目標地址之前是不是已經存在連結Connection,如果沒有就建立一個,conn->send_message(m)傳送訊息
-----------------------------------------------------------
conn->send_message(m)
-----------------------------------------------------------
2) 方式二:
① 首先要通過Messenger類,獲取對應的Connection:
------------------------------------------------------------
conn = messenger->get_connection(dest_server);
------------------------------------------------------------
get_connection過程是這樣的,如果dest.addr是my_inst.addr,就直接返回local_connection。
如果連結不存在就新建一個。
② 當獲得一個Connection之後,就可以呼叫Connection的傳送函式來發送訊息。
-----------------------------------------------------------
conn->send_message(m)
-----------------------------------------------------------
具體傳送的實現過程依賴於選擇的訊息模式,simple、async等實現方式都不同。在另一個文章裡我會講到具體實現過程,這裡不多做解釋。
1.2 訊息的接收
訊息的接收過程簡言之就是通過監聽socket判斷是否有訊息到來,如果有就接收。這個過程是個很複雜的過程,涉及到了連線建立、錯誤處理等等。具體的實現依賴於選擇的訊息模式,比如,SimpleMessenger是使用一個read執行緒來實現;AsyncMessenger是使用基於事件的機制實現。接收的過程對應用層都是透明的,本章不做解釋。
1.3 訊息的處理
訊息接收完成後,就進入訊息的處理。首先判斷訊息m是否可以fast_dispatch,如果可以,呼叫註冊fast_dispatcher函式處理訊息。如果不能fast_dispatch,呼叫函式in_q->enqueue,將接收到的訊息加入到DispatchQueue的mqueue佇列中,排隊等待處理。
2 ceph OSD心跳檢測與網路模組
下面我們具體舉一個OSD心跳檢測的例子來講解,通過心跳檢測機制來了解網路模組的使用。在ceph中需要通過心跳檢測來判斷OSD是不是線上,因為這部分的功能比較簡單獨立。
2.1 Messenger & Dispatcher的註冊
在OSD模組註冊的7個Messenger和2個Dispatcher中,4個Messenger都和心跳檢測相關,一個heartbeat_dispatcher用來處理心跳連線。
編號 |
Messenger例項名稱 |
作用 |
1 |
*ms_public |
用來處理OSD和Client之間的訊息 |
2 |
*ms_cluster |
用來處理OSD和叢集之間的訊息 |
3 |
*ms_hb_front_client |
用來向其它OSD傳送心跳的訊息 |
4 |
*ms_hb_back_client |
用來向其它OSD傳送心跳的訊息 |
5 |
*ms_hb_back_server |
用來接收其他OSD的心跳訊息 |
6 |
*ms_hb_front_server |
用來接收其他OSD的心跳訊息 |
7 |
*ms_objecter |
用來處理OSD和Objecter之間的訊息 |
對應程式碼在ceph_osd.cc中
-------------------------------------------------------------------------------------------------------
-HeartBeatMessenger
在ceph守護程序啟動過程中(ceph-osd.cc),建立了4個messenger用於心跳檢測。
在ceph守護程序建立osd的時候將這些messenger傳給了osd, 注意這些messenger在osd中被重新命名了。
重新命名為了:
hb_front_client_messenger
hb_back_client_messenger
hb_front_server_messenger
hb_back_server_messenger
---------------------------------------------------------------------------------------------------
編號 |
Dispatcher例項名稱 |
作用 |
1 |
*OSD |
可以處理部分osd節點的訊息 |
2 |
*heartbeat_dispatcher |
處理心跳連線 |
對應程式碼在osd.cc中
-------------------------------------------------------------------------------------------------------
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
-------------------------------------------------------------------------------------------------------
2.2 心跳機制介紹
OSD 中有一個heartbeat_thread,這個heartbeat_thread的作用就是不斷的傳送ping請求給其他節點。在Ceph中,OSD的地位都是對等的,每一個OSD在向其他OSD傳送ping訊息的同時,也會收到其他OSD發來ping訊息。OSD收到ping訊息後會傳送一個回覆訊息reply message。在部署ceph的時候,通常會使用兩張網絡卡front和back,將流量分開。所以OSD使用兩對messenger和來分別傳送和監聽front額back的ping心跳。圖中展示了一個3個OSD的ceph叢集的心跳檢測過程,只畫出了front網絡卡的心跳檢測,back雷同。
2.3 心跳檢測中網路模組的使用
我們著重看兩個osd節點間如何通訊。上圖展示了OSD A向OSD B傳送Ping訊息(hb_front_client_messenger(A) ->ms_hb_front_server(B)),然後OSD B收到訊息後交給dispatcher進行處理,然後傳送回覆訊息給OSD A (hb_front_client_messenger(A) <-ms_hb_front_server(B))的過程。具體的可以拆分成5個步驟:
- ① 連線(connection)建立
- ② Ping訊息傳送
- ③ Ping訊息接受及處理
- ④ 回覆訊息
- ⑤ 處理PING_REPLY
下面對這五個步驟進行一一介紹。
① 連線(connection)建立
獲取目標 messenger B的連結connection
conn =hb_front_client_messenger(A)->get_connection(dest_server_B),如果沒有連結就建立一個。
monitor中的osdmap記錄了每一個osd的front地址和back地址,這個是在osd啟動的時候就告訴monitor的。
OSDService::get_con_osd_hb(),首先先獲取osdmap,獲取目標osdB的front地址,然後在A的messenger hb_front_client_messenger中建立一個connection。
pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
OSDMapRef next_map = get_nextmap_reserved();
// service map is always newer/newest
assert(from_epoch <= next_map->get_epoch());
pair<ConnectionRef,ConnectionRef> ret;
if (next_map->is_down(peer) ||
next_map->get_info(peer).up_from > from_epoch) {
release_map(next_map);
return ret;
}
ret.first = osd->hb_back_client_messenger->get_connection(next_map->get_hb_back_inst(peer));
if (next_map->get_hb_front_addr(peer) != entity_addr_t())
ret.second = osd->hb_front_client_messenger->get_connection(next_map->get_hb_front_inst(peer));
release_map(next_map);
return ret;
}
osd->hb_back_client_messenger->get_connection(next_map->get_hb_back_inst(peer));
if (next_map->get_hb_front_addr(peer) != entity_addr_t())
ret.second = osd->hb_front_client_messenger->get_connection(next_map->get_hb_front_inst(peer));
release_map(next_map);
return ret;
}
② Ping訊息傳送
在OSD::heartbeat()中,對記錄了osd連線資訊map進行遍歷,每一個heartbeatinfo中記錄了一個目標osd的連結資訊(connection),通過這些conns把訊息傳送出去。重點的就是紅色部分。
void OSD::heartbeat()
{
......
// send heartbeats
for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
i != heartbeat_peers.end();
++i) {
int peer = i->first;
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap()->get_epoch(),
MOSDPing::PING, now,
cct->_conf->osd_heartbeat_min_size));
if (i->second.con_front)
i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap()->get_epoch(),
MOSDPing::PING, now,
cct->_conf->osd_heartbeat_min_size));
......
}
}
i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap()->get_epoch(),
MOSDPing::PING, now,
cct->_conf->osd_heartbeat_min_size));
if (i->second.con_front)
i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap()->get_epoch(),
MOSDPing::PING, now,
cct->_conf->osd_heartbeat_min_size));
......
}
}
③ Ping訊息接受及處理
OSD B收到訊息,Messenger B內部的dispatch執行緒會呼叫事先鍵入的dispatcher對訊息進行處理。HeartbeatDispatcher會將message交給osd->heartbeat_dispatch()處理。
struct HeartbeatDispatcher : public Dispatcher {
OSD *osd;
explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
bool ms_dispatch(Message *m) override {
return osd->heartbeat_dispatch(m);
}
......
}heartbeat_dispatcher
bool OSD::heartbeat_dispatch(Message *m)
{
dout(30) << "heartbeat_dispatch " << m << dendl;
switch (m->get_type()) {
case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source_inst() << dendl;
m->put();
break;
case MSG_OSD_PING:
handle_osd_ping(static_cast<MOSDPing*>(m));
break;
default:
dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
m->put();
}
return true;
}
osd->heartbeat_dispatch(m);
}
......
}heartbeat_dispatcher
bool OSD::heartbeat_dispatch(Message *m)
{
dout(30) << "heartbeat_dispatch " << m << dendl;
switch (m->get_type()) {
case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source_inst() << dendl;
m->put();
break;
case MSG_OSD_PING:
handle_osd_ping(static_cast<MOSDPing*>(m));
break;
default:
dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
m->put();
}
return true;
}
heartbeat_dispatch()根據訊息的type進行處理,因為訊息的type是MSG_OSD_PING,調到OSD::handle_osd_ping(MOSDPing *m)進行處理, 進入case MOSDPing::PING:
void OSD::handle_osd_ping(MOSDPing *m)
{
switch (m->op) {
case MOSDPing::PING:
{
//做了一系列處理
...
//傳送回覆包
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::PING_REPLY, m->stamp,
cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
...
}
case MOSDPing::PING_REPLY:
{
// 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor
}
}
case MOSDPing::PING:
{
//做了一系列處理
...
//傳送回覆包
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::PING_REPLY, m->stamp,
cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
...
}
case MOSDPing::PING_REPLY:
{
// 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor
}
}
④ 回覆訊息
第③步中,在OSD B的dispatcher中對訊息做一系列處理後,會封裝一個回覆訊息PING_REPLY,然後傳送OSD A。
------------------------------------------------------------------------
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::PING_REPLY, m->stamp,
cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
------------------------------------------------------------------------
這個過程和前面類似,不同的是這次是OSD B的Messenger去獲取connection連結,(這個連結就是之前OSD A建立的連結,通過看Asyncmessenger.cc 可以看到在執行第②步中AsyncConnection::send_message(Message *m)時,通過 m->set_connection(this);將connection賦給了message,所以說A向B傳送訊息和B向A傳送回覆訊息都是用的同一條連結connection),傳送回覆訊息給OSD A。
⑤ 處理PING_REPLY
OSD A 的Messenger(hb_front_client_messenger)監聽到了回覆訊息,交給自己的Dispatcher處理。
還是先進入osd->heartbeat_dispatch(m)的MSG_OSD_PING,然後使用OSD::handle_osd_ping(MOSDPing *m)根據 訊息的type做相應處理,這次是進入ping reply
void OSD::handle_osd_ping(MOSDPing *m)
{
switch (m->op) {
case MOSDPing::PING:
{
...
//傳送回覆包
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::PING_REPLY, m->stamp,
cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
...
}
case MOSDPing::PING_REPLY:
{
// 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor
}
}
case MOSDPing::PING_REPLY:
{
// 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor
}
}
可以看出,心跳的傳送流程是很簡單的,也是很獨立的。在設計分散式系統的時候,為了保證叢集的內部狀態正確,應儘量不要引入過多複雜的因素影響心跳的流程。 畢竟心跳快速正確的處理是確保叢集運轉正常的最基本條件。
參考文章【reference】:
整理中。。。