Glusterfs之rpc模組原始碼分析(中)之Glusterfs的rpc模組實現(3)
歡迎大家相互交流,共同提高技術。
第三節、rpc通訊過程分析
前面兩個小節分別對rpc服務端和客戶端的建立流程做了詳細的分析,也就是說rpc客戶端和伺服器端已經能夠進行正常的通訊了(rpc客戶端已經通過connect連結上rpc伺服器了),那麼這一小節主要根據一個實際的例子來分析一個完整的rpc通訊過程。
下面以客戶端建立邏輯卷(volume)為例來分析rpc的通訊過程,就以下面這個客戶端的命令開始:
gluster volume create test-volume server3:/exp3 server4:/exp4
先簡單看看glusterfs的客戶端是怎樣開始提交rpc請求的,提交準備過程流程圖如下:
從上面的流程圖可以看出真正開始提交rpc請求呼叫還是從具體命令的回撥函式開始發起的,上面的流程圖主要展示的是準備工作,下面從具體命令的回撥函式開始分析,這裡分析的例項是建立邏輯卷的命令,執行的函式是cli_cmd_volume_create_cbk,主要實現程式碼如下:
proc = &cli_rpc_prog->proctable[GLUSTER_CLI_CREATE_VOLUME];//從rpc程式表中選擇對應函式
frame = create_frame (THIS, THIS->ctx->pool);//建立幀
ret = cli_cmd_volume_create_parse (words, wordcount, &options);//建立邏輯卷的命令解析
if (proc->fn) {
ret = proc->fn (frame, THIS, options);//執行命令的回撥函式
}
if (ret) {
cli_cmd_sent_status_get (&sent);//得到命令傳送狀態
if ((sent == 0) && (parse_error == 0))
cli_out ("Volume create failed");//如果失敗,錯誤提示
}
首先選擇對應命令的rpc客戶端建立邏輯卷的命令函式,然後解析命令以後執行相應的建立邏輯卷的rpc函式,下面是對應的函式存放表項:
[GLUSTER_CLI_CREATE_VOLUME] = {"CREATE_VOLUME", gf_cli3_1_create_volume}
所以真正的提交函式是gf_cli3_1_create_volume函式,繼續分析這個函式,主要實現程式碼如下:
ret = cli_cmd_submit (&req, frame, cli_rpc_prog, GLUSTER_CLI_CREATE_VOLUME, NULL,
gf_xdr_from_cli_create_vol_req, this, gf_cli3_1_create_volume_cbk);
主要程式碼也只有一行,其餘程式碼就是為了這一行的函式呼叫做相應引數準備的,這一行的這個函式就是所有客戶端命令提交rpc請求服務的實現函式,只是提交的資料不同而已!下面重點分析這個函式,還是先看看主要程式碼:
cli_cmd_lock ();//命令物件加鎖
cmd_sent = 0;//初始化命令傳送狀態標示為0
ret = cli_submit_request (req, frame, prog, procnum, NULL, sfunc, this, cbkfn);//提交請求
if (!ret) {
cmd_sent = 1;//標示已經發送
ret = cli_cmd_await_response ();//等待響應
} else
cli_cmd_unlock ();//不成功解鎖
在傳送具體的rpc請求以前先鎖住命令物件,然後呼叫函式cli_submit_request 把rpc請求傳送出去(應該是非同步的),然後設定命令以傳送標誌,並呼叫函式cli_cmd_await_response等待響應。繼續看提交rpc請求的函式:
iobuf = iobuf_get (this->ctx->iobuf_pool);//從緩衝池取一個io快取
if (!iobref) {
iobref = iobref_new ();//新建一個iobuf引用池
new_iobref = 1;//標誌
}
iobref_add (iobref, iobuf);//把io快取加入io快取引用池
iov.iov_base = iobuf->ptr;//io向量基地址(供使用者使用的記憶體)
iov.iov_len = 128 * GF_UNIT_KB;//大小
if (req && sfunc) {
ret = sfunc (iov, req);//序列化為xdr格式資料(表示層資料格式)
iov.iov_len = ret;//序列化以後的長度
count = 1;//計數初始化為1
}
ret = rpc_clnt_submit (global_rpc, prog, procnum, cbkfn, &iov, count,//提交客戶端rpc請求
NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL);
Xdr資料格式的轉換是呼叫函式庫實現的,不具體分析,需要明白的是經過sfunc 函式呼叫以後就是xdr格式的資料了,最後根據轉化後的資料呼叫rpc_clnt_submit提交客戶端的rpc請求。繼續深入函式:
rpcreq = mem_get (rpc->reqpool);//重rpc物件的請求物件池得到一個請求物件
if (!iobref) {
iobref = iobref_new ();//如果io快取引用池為null就新建一個
new_iobref = 1;//新建標誌
}
callid = rpc_clnt_new_callid (rpc);//新建一個rpc呼叫的id號
conn = &rpc->conn;//從rpc物件中取得連結物件
rpcreq->prog = prog;//賦值rpc請求物件的程式
rpcreq->procnum = procnum;//程式號
rpcreq->conn = conn;//連結物件
rpcreq->xid = callid;//呼叫id號
rpcreq->cbkfn = cbkfn;//回撥函式
if (proghdr) {//程式頭不為空
proglen += iov_length (proghdr, proghdrcount);//計算頭部長度加入程式訊息總長度
}
if (progpayload) {
proglen += iov_length (progpayload, progpayloadcount);//計算io向量的長度加入總長度
}
request_iob = rpc_clnt_record (rpc, frame, prog, procnum, proglen, &rpchdr, callid);//建立rpc記錄
iobref_add (iobref, request_iob);//新增rpc記錄的io快取區到io快取引用池
req.msg.rpchdr = &rpchdr;//rpc請求訊息頭部
req.msg.rpchdrcount = 1;//頭部數量
req.msg.proghdr = proghdr;//程式頭部
req.msg.proghdrcount = proghdrcount;//程式頭部數量
req.msg.progpayload = progpayload;//xdr格式資料
req.msg.progpayloadcount = progpayloadcount;//數量
req.msg.iobref = iobref;//io快取引用池
req.rsp.rsphdr = rsphdr;//響應頭部
req.rsp.rsphdr_count = rsphdr_count;//數量
req.rsp.rsp_payload = rsp_payload;//負載
req.rsp.rsp_payload_count = rsp_payload_count;//數量
req.rsp.rsp_iobref = rsp_iobref;//響應快取引用池
req.rpc_req = rpcreq;//rpc請求
pthread_mutex_lock (&conn->lock);//加鎖
{
if (conn->connected == 0) {//還沒有建立連線
rpc_transport_connect (conn->trans, conn->config.remote_port);//建立連線
}
ret = rpc_transport_submit_request (rpc->conn.trans, &req);//提交傳輸層rpc請求
if ((ret >= 0) && frame) {
gettimeofday (&conn->last_sent, NULL);//設定最後傳送時間
__save_frame (rpc, frame, rpcreq);//儲存幀到佇列
}
}
pthread_mutex_unlock (&conn->lock);//解鎖
經過上面的程式碼,現在資料已經到達傳輸層,所以現在就開始呼叫傳輸層的rpc請求傳送函式rpc_transport_submit_request,程式碼如下:
ret = this->ops->submit_request (this, req);
這裡採用函式指標的方式進行呼叫的,具體的傳輸協議呼叫具體的傳輸函式,這些函式都是在裝載協議庫的時候已經賦值具體函式的實現了,分析的是tcp,所以看看tcp的傳送函式:
struct rpc_transport_ops tops = {
......
.submit_request = socket_submit_request,
......
};
從上面可以看出tcp的傳送函式是socket_submit_request,主要實現程式碼如下:
pthread_mutex_lock (&priv->lock);//加鎖
{
priv->submit_log = 0;//提交標誌初始化為0
entry = __socket_ioq_new (this, &req->msg);//根據請求物件的訊息新建一個io請求佇列
if (list_empty (&priv->ioq)) {//判斷提交io請求佇列是否為空
ret = __socket_ioq_churn_entry (this, entry);//開始依次提交傳輸層的io請求
if (ret == 0)
need_append = 0;//需要新增到entry連結串列
if (ret > 0)
need_poll_out = 1;//需要註冊可寫事件
}
if (need_append) {
list_add_tail (&entry->list, &priv->ioq);//新增到entry的連結串列
}
if (need_poll_out) {//註冊可寫事件
priv->idx = event_select_on (ctx->event_pool, priv->sock, priv->idx, -1, 1);
}
}
pthread_mutex_unlock (&priv->lock);//解鎖
這段加鎖的程式碼就是完成整個rpc請求資訊的傳送,如果沒有傳送完畢就在註冊一個可寫事件啟動下一次請求,到此客戶端的rpc請求已經發送完畢,就開始等待伺服器的響應。
下面就看看rpc伺服器端怎麼響應客戶端的請求,並根據相應的請求命令做怎樣的處理。在分析rpc服務啟動的時候知道註冊了監聽事件,監聽事件的處理函式是socket_server_event_handler,它的主要實現程式碼如下:
pthread_mutex_lock (&priv->lock);
{
if (poll_in) {//連線到來是可讀事件
new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen);//接收客戶端連線
if (!priv->bio) {//設定非阻塞
ret = __socket_nonblock (new_sock);
}
if (priv->nodelay) {//設定無延遲傳送
ret = __socket_nodelay (new_sock);
}
if (priv->keepalive) {//設定保持連線
ret = __socket_keepalive (new_sock, priv->keepaliveintvl, priv->keepaliveidle);
}
//為連線物件
new_trans = GF_CALLOC (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t);
new_trans->name = gf_strdup (this->name);//賦值名稱
memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen);//賦值地址資訊
new_trans->peerinfo.sockaddr_len = addrlen;//長度
new_trans->myinfo.sockaddr_len = sizeof (new_trans->myinfo.sockaddr);
ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr),
&new_trans->myinfo.sockaddr_len);//得到新socket的地址資訊
get_transport_identifiers (new_trans);
socket_init (new_trans);//初始化新的傳輸層物件(新的socket)
pthread_mutex_lock (&new_priv->lock);
{
new_priv->connected = 1;//連線已經建立
rpc_transport_ref (new_trans);//傳輸物件引用計數加1
new_priv->idx = event_register (ctx->event_pool, new_sock,//註冊可讀事件
socket_event_handler, new_trans, 1, 0);
}
pthread_mutex_unlock (&new_priv->lock);
//執行傳輸物件註冊的通知函式,通知已經接受客戶端連線請求
ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans);
}
}
pthread_mutex_unlock (&priv->lock);
上面的程式碼主要就是處理客戶端的連線請求,然後在新的socket上註冊可讀事件(準備讀取客戶端傳送來的rpc請求資訊),並且執行通知函式做相應的處理。註冊的可讀事件的處理函式是socket_event_handler,主要是實現程式碼如下:
if (!priv->connected) {//如果連線還沒有完成就繼續完成連線,因為連線是非同步的可能沒有立即完成
ret = socket_connect_finish (this);
}
if (!ret && poll_out) {//處理可寫事件
ret = socket_event_poll_out (this);
}
if (!ret && poll_in) {//處理可讀事件
ret = socket_event_poll_in (this);
}
客戶端的連線請求對於伺服器來說是可讀事件,所以執行的socket_event_poll_in函式,當伺服器需要傳送響應資訊到rpc客戶端的時候就會執行可寫事件處理函式。繼續分析接收客戶端請求資訊的處理函式socket_event_poll_in主要程式碼如下:
ret = socket_proto_state_machine (this, &pollin);//根據rpc服務記錄的狀態做相應處理
if (pollin != NULL) {
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin);//執行通知函式
rpc_transport_pollin_destroy (pollin);//完成處理就銷燬資源
}
上面的程式碼主要還是呼叫其它函式繼續處理rpc客戶端的請求資訊,然後執行通知函式通知傳輸物件訊息已經被接收,最後銷燬傳輸層相關不在需要的資源。處理具體請求資訊的實現是在函式socket_proto_state_machine,而這個函式又呼叫__socket_proto_state_machine來處理,所以看看這個函式實現功能的主要程式碼:
while (priv->incoming.record_state != SP_STATE_COMPLETE) {//直到rpc服務記錄狀態完成為止
switch (priv->incoming.record_state) {//根據現在rpc服務記錄的狀態做相應處理
case SP_STATE_NADA://開始狀態
iobuf = iobuf_get (this->ctx->iobuf_pool);//取得一個io快取
priv->incoming.record_state = SP_STATE_READING_FRAGHDR;//改變狀態為讀取頭部
case SP_STATE_READING_FRAGHDR://讀取頭部資訊
ret = __socket_readv (this, priv->incoming.pending_vector, 1,//讀取資訊
&priv->incoming.pending_vector,
&priv->incoming.pending_count, NULL);
if (ret > 0) {//讀取了部分頭部資訊
}
if (ret == 0) {//讀取了所有頭部資訊,繼續下一步的處理
priv->incoming.record_state = SP_STATE_READ_FRAGHDR;//改變為下一步
}
case SP_STATE_READ_FRAGHDR://處理已經讀取的頭部資訊
priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);//轉換頭部資訊為主機位元組
priv->incoming.record_state = SP_STATE_READING_FRAG;//轉化為讀取幀資料狀態
priv->incoming.total_bytes_read += RPC_FRAGSIZE(priv->incoming.fraghdr);//位元組數
case SP_STATE_READING_FRAG://讀取所有的資料
ret = __socket_read_frag (this);//讀取所有幀資料
priv->incoming.frag.bytes_read = 0;
if (!RPC_LASTFRAG (priv->incoming.fraghdr)) {//是否為最後一幀資料
priv->incoming.record_state = SP_STATE_READING_FRAGHDR;//不是
break;//退出迴圈,從新讀取頭部資訊
}
if (pollin != NULL) {
int count = 0;//計數
priv->incoming.iobuf_size = priv->incoming.total_bytes_read
- priv->incoming.payload_vector.iov_len;//計算io快取大小
memset (vector, 0, sizeof (vector));//io向量清零
if (priv->incoming.iobref == NULL) {//io快取引用池為null就新建一個
priv->incoming.iobref = iobref_new ();
}
vector[count].iov_base = iobuf_ptr (priv->incoming.iobuf);//取io快取基地址
vector[count].iov_len = priv->incoming.iobuf_size;//io快取長度
iobref = priv->incoming.iobref;//io快取引用池
count++;//計數加1
if (priv->incoming.payload_vector.iov_base != NULL) {//負載向量不為null
vector[count] = priv->incoming.payload_vector;//儲存負載io向量
count++;//計數加1
}
//新建一個傳輸層可取物件
*pollin = rpc_transport_pollin_alloc (this, vector, count, priv->incoming.iobuf,
iobref, priv->incoming.request_info);
iobuf_unref (priv->incoming.iobuf);//io快取引用計算減1
priv->incoming.iobuf = NULL;//清零
if (priv->incoming.msg_type == REPLY)//訊息型別是回覆
(*pollin)->is_reply = 1;//設定回覆標誌
priv->incoming.request_info = NULL;//請求資訊清零
}
priv->incoming.record_state = SP_STATE_COMPLETE;//設定為完成狀態
break;
}
}
if (priv->incoming.record_state == SP_STATE_COMPLETE) {//如果rpc請求記錄為完成狀態
priv->incoming.record_state = SP_STATE_NADA;//重新初始化為開始狀態
__socket_reset_priv (priv);//復位私有資料物件
}
整個處理過程分為了幾個階段,而且每一個階段只處理相應的事情,然後就進入下一個階段,因為前幾個階段case語言都是不帶break的,所以直接進入下一個階段,最終達到完成狀態就退出迴圈,一個完成的處理過程其實就只需要一次迴圈就解決了。當所有rpc請求訊息都已經接收以後就呼叫通知函式(在傳輸物件上註冊的通知函式)通知傳輸物件訊息已經接收,由rpc伺服器的初始化過程我們知道註冊的傳輸物件通知函式是rpcsvc_notify ,這個函式主要實現程式碼如下:
switch (event) {
case RPC_TRANSPORT_ACCEPT://rpc請求已經被接收處理
new_trans = data;
ret = rpcsvc_accept (svc, trans, new_trans);//處理函式
break;
case RPC_TRANSPORT_DISCONNECT://斷開連線訊息
ret = rpcsvc_handle_disconnect (svc, trans);//處理函式
break;
case RPC_TRANSPORT_MSG_RECEIVED://訊息已經接收
msg = data;
ret = rpcsvc_handle_rpc_call (svc, trans, msg);//rpc呼叫處理函式
break;
case RPC_TRANSPORT_MSG_SENT://訊息已經發生,不需要處理
break;
case RPC_TRANSPORT_CONNECT://已經連線
break;
case RPC_TRANSPORT_CLEANUP://清零訊息
listener = rpcsvc_get_listener (svc, -1, trans->listener);//得到對應的監聽器物件
rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY, trans);//通知上層
break;
case RPC_TRANSPORT_MAP_XID_REQUEST:
break;
}
傳輸物件註冊的通知函式會根據傳遞過來的資訊型別做相應的處理,這裡傳遞過來的訊息是訊息已經接收,它的處理就是開始執行rpc呼叫了,執行的函式是rpcsvc_handle_rpc_call,它的主要實現程式碼如下:
req = rpcsvc_request_create (svc, trans, msg);//建立一個rpc服務請求物件
if (!rpcsvc_request_accepted (req))//判斷rpc請求是否被接受
;
actor = rpcsvc_program_actor (req);//根據請求物件取得rpc過程呼叫物件
if (actor && (req->rpc_err == SUCCESS)) {//rpc過程呼叫物件不為null並且請求資訊是成功的
THIS = svc->mydata;//取得xlator物件
if (req->count == 2) {//請求的數量等於2
if (actor->vector_actor) {//向量過程不為null,就執行向量處理函式
ret = actor->vector_actor (req, &req->msg[1], 1, req->iobref);
} else {
rpcsvc_request_seterr (req, PROC_UNAVAIL);//出錯,不可用的函式
ret = RPCSVC_ACTOR_ERROR;//呼叫過程出錯
}
} else if (actor->actor) {
ret = actor->actor (req);//呼叫rpc請求函式
}
}
if (ret == RPCSVC_ACTOR_ERROR) {//出錯
ret = rpcsvc_error_reply (req);//回覆客戶端rpc請求處理出錯
}
上面程式碼首先根據接收到的資訊建立一個請求物件,然後根據建立的請求物件判斷是都已經成功接納此次rpc請求呼叫,如果是就繼續執行函式rpcsvc_program_actor,這個函式會根據程式號、函式號等資訊查詢對應的rpc請求的遠端過程呼叫,如果找到就執行相應的函式呼叫。我們分析的是客戶端傳送一條建立邏輯卷的命令道伺服器端,根據伺服器端在啟動初始化的過程中註冊的程式集中我們能夠找到如下一條對應的函式資訊:
[GLUSTER_CLI_CREATE_VOLUME] = { "CLI_CREATE_VOLUME", GLUSTER_CLI_CREATE_VOLUME, glusterd_handle_create_volume, NULL,NULL},
所以伺服器端就會呼叫函式glusterd_handle_create_volume,如果在處理rpc請求的過程中遇到錯誤就會向客戶端傳送一個錯誤資訊的相應訊息。當然如果呼叫成功的話也同樣會返回給客戶端一個相應的結果資訊。客戶端會接收伺服器端的回覆,然後根據訊息內容做相應的處理,如:建立成功等提示資訊。這樣一次完整的rpc通訊就完成了。