thrift之TTransport層的堵塞的套接字I/O傳輸類TSocket
阿新 • • 發佈:2019-01-04
本節將介紹第一個實現具體傳輸功能的類TSocket,這個類是基於TCP socket實現TTransport的介面。下面具體介紹這個類的相關函式功能實現。
1.建構函式
分析一個類的功能首先看它的定義和建構函式實現,先看看它的定義:
TSocket類的建構函式有4個,當然還有一個解構函式。四個建構函式就是根據不同的引數來構造,它們的宣告如下:
socket API原本是為網路通訊設計的,但後來在socket的框架上發展出一種IPC機制,就是UNIX Domain Socket。雖然網路socket也可用於同一臺主機的程序間通訊(通過loopback地址127.0.0.1),但是UNIX Domain Socket用於IPC更有效率:不需要經過網路協議棧,不需要打包拆包、計算校驗和、維護序號和應答等,只是將應用層資料從一個程序拷貝到另一個程序。這是因為,IPC機制本質上是可靠的通訊,而網路協議是為不可靠的通訊設計的。UNIX Domain Socket也提供面向流和麵向資料包兩種API介面,類似於TCP和UDP,但是面向訊息的UNIX Domain Socket也是可靠的,訊息既不會丟失也不會順序錯亂。
UNIX Domain Socket是全雙工的,API介面語義豐富,相比其它IPC機制有明顯的優越性,目前已成為使用最廣泛的IPC機制,比如X Window伺服器和GUI程式之間就是通過UNIX Domain Socket通訊的。
使用UNIX Domain Socket的過程和網路socket十分相似,也要先呼叫socket()建立一個socket檔案描述符,address family指定為AF_UNIX,type可以選擇SOCK_DGRAM或SOCK_STREAM,protocol引數仍然指定為0即可。
UNIX Domain Socket與網路socket程式設計最明顯的不同在於地址格式不同,用結構體sockaddr_un表示,網路程式設計的socket地址是IP地址加埠號,而UNIX Domain Socket的地址是一個socket型別的檔案在檔案系統中的路徑,這個socket檔案由bind()呼叫建立,如果呼叫bind()時該檔案已存在,則bind()錯誤返回。
開啟連線函式open
首先看這個函式的程式碼實現,如下:
讀函式read
在實現讀函式的時候需要注意區分返回錯誤為EAGAIN的情況,因為當超時和系統資源耗盡都會產生這個錯誤(沒有明顯的特徵可以區分它們),所以Thrift在實現的時候設定一個最大的嘗試次數,如果超過這個了這個次數就認為是系統資源耗盡了。下面具體看看read函式的實現,程式碼如下(省略一些引數檢查和錯誤處理的程式碼):
寫函式write
寫函式和讀函式實現差不多,主要的程式碼還是在處理錯誤上面,還有一點不同的是寫函式寫的內容可能一次沒有傳送完畢,所以是在一個while迴圈中一直髮送直到指定的內容全部發送完畢。程式碼實現如下:
其他函式
TSocket類還有一些其他函式,不過功能都比較簡單,比如設定一些超時和得到一些成員變數值的函式,哪些函式一般都是幾句程式碼完成了。
1.建構函式
分析一個類的功能首先看它的定義和建構函式實現,先看看它的定義:
class TSocket : public TVirtualTransport<TSocket> { ......}
由定義可以看書TSocket繼承至虛擬傳輸類,並且把自己當做模板引數傳遞過去,所以從虛擬傳輸類繼承下來的虛擬函式(如read_virt)呼叫非虛擬函式(如read)就是TSocket自己實現的。TSocket類的建構函式有4個,當然還有一個解構函式。四個建構函式就是根據不同的引數來構造,它們的宣告如下:
四個建構函式分別用於不同的情況下來產生不同的TSocket物件,不過這些建構函式都只是簡單的初始化一些最基本的成員變數,而沒有真正的連線socket。它們初始化的變數基本如下:TSocket();//所有引數都預設 TSocket(std::string host, int port);//根據主機名和埠構造一個socket TSocket(std::string path);//構造unix域的一個socket TSocket(int socket);//構造一個原始的unix控制代碼socket
大部分簡單的引數都採用初始化列表初始化了,需要簡單計算的就放在函式體內初始化,其他幾個都是這種情況。下面需要單獨介紹一下的是unix domain socket。TSocket::TSocket() : host_(""), port_(0), path_(""), socket_(-1), connTimeout_(0), sendTimeout_(0), recvTimeout_(0), lingerOn_(1), lingerVal_(0), noDelay_(1), maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; }
socket API原本是為網路通訊設計的,但後來在socket的框架上發展出一種IPC機制,就是UNIX Domain Socket。雖然網路socket也可用於同一臺主機的程序間通訊(通過loopback地址127.0.0.1),但是UNIX Domain Socket用於IPC更有效率:不需要經過網路協議棧,不需要打包拆包、計算校驗和、維護序號和應答等,只是將應用層資料從一個程序拷貝到另一個程序。這是因為,IPC機制本質上是可靠的通訊,而網路協議是為不可靠的通訊設計的。UNIX Domain Socket也提供面向流和麵向資料包兩種API介面,類似於TCP和UDP,但是面向訊息的UNIX Domain Socket也是可靠的,訊息既不會丟失也不會順序錯亂。
UNIX Domain Socket是全雙工的,API介面語義豐富,相比其它IPC機制有明顯的優越性,目前已成為使用最廣泛的IPC機制,比如X Window伺服器和GUI程式之間就是通過UNIX Domain Socket通訊的。
使用UNIX Domain Socket的過程和網路socket十分相似,也要先呼叫socket()建立一個socket檔案描述符,address family指定為AF_UNIX,type可以選擇SOCK_DGRAM或SOCK_STREAM,protocol引數仍然指定為0即可。
UNIX Domain Socket與網路socket程式設計最明顯的不同在於地址格式不同,用結構體sockaddr_un表示,網路程式設計的socket地址是IP地址加埠號,而UNIX Domain Socket的地址是一個socket型別的檔案在檔案系統中的路徑,這個socket檔案由bind()呼叫建立,如果呼叫bind()時該檔案已存在,則bind()錯誤返回。
開啟連線函式open
首先看這個函式的程式碼實現,如下:
Open函式又根據路徑為不為空(不為空就是unix domain socket)呼叫相應的函式來繼續開啟連線,首先看看開啟unix domain socket,程式碼如下:void TSocket::open() { if (isOpen()) {//如果已經開啟就直接返回 return; } if (! path_.empty()) {//如果unix路徑不為空就開啟unix domian socket unix_open(); } else { local_open();//開啟通用socket } }
void TSocket::unix_open(){
if (! path_.empty()) {//保證path_不為空
// Unix Domain SOcket does not need addrinfo struct, so we pass NULL
openConnection(NULL);//呼叫真正的開啟連線函式
}
}
由程式碼可以看出,真正實現開啟連線的函式是openConnection,這個函式根據傳遞的引數來決定是否是開啟unix domain socket,實現程式碼如下(這個函式程式碼比較多,其中除了錯誤部分程式碼省略): void TSocket::openConnection(struct addrinfo *res) {
if (isOpen()) {
return;//如果已經打開了直接返回
}
if (! path_.empty()) {//根據路徑是否為空建立不同的socket
socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);//建立unix domain socket
} else {
socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);//建立通用的網路通訊socket
}
if (sendTimeout_ > 0) {//如果發生超時設定大於0就呼叫設定傳送超時函式設定傳送超時
setSendTimeout(sendTimeout_);
}
if (recvTimeout_ > 0) {//如果接收超時設定大於0就呼叫設定接收超時函式設定接收超時
setRecvTimeout(recvTimeout_);
}
setLinger(lingerOn_, lingerVal_);//設定優雅斷開連線或關閉連線引數
setNoDelay(noDelay_);//設定無延時
#ifdef TCP_LOW_MIN_RTO
if (getUseLowMinRto()) {//設定是否使用較低的最低TCP重傳超時
int one = 1;
setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
}
#endif
//如果超時已經存在設定連線為非阻塞
int flags = fcntl(socket_, F_GETFL, 0);//得到socket_的標識
if (connTimeout_ > 0) {//超時已經存在
if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {//設定為非阻塞
}
} else {
if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {//設定為阻塞
}
}
// 連線socket
int ret;
if (! path_.empty()) {//unix domain socket
#ifndef _WIN32 //window不支援
struct sockaddr_un address;
socklen_t len;
if (path_.length() > sizeof(address.sun_path)) {//path_長度不能超過最長限制
}
address.sun_family = AF_UNIX;
snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
len = sizeof(address);
ret = connect(socket_, (struct sockaddr *) &address, len);//連線unix domain socket
#else
//window不支援unix domain socket
#endif
} else {
ret = connect(socket_, res->ai_addr, res->ai_addrlen);//連線通用的非unix domain socket
}
if (ret == 0) {//失敗了就會執行後面的程式碼,用poll來監聽寫事件
goto done;//成功了就直接跳轉到完成處
}
struct pollfd fds[1];//定於用於poll的描述符
std::memset(fds, 0 , sizeof(fds));//初始化為0
fds[0].fd = socket_;//描述符為socket
fds[0].events = POLLOUT;//接收寫事件
ret = poll(fds, 1, connTimeout_);//呼叫poll,有一個超時值
if (ret > 0) {
// 確保socket已經被連線並且沒有錯誤被設定
int val;
socklen_t lon;
lon = sizeof(int);
int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);//得到錯誤選項引數
if (val == 0) {// socket沒有錯誤也直接到完成處了
goto done;
}
} else if (ret == 0) {// socket 超時
//相應處理程式碼省略
} else {
// poll()出錯了,相應處理程式碼省略
}
done:
fcntl(socket_, F_SETFL, flags);//設定socket到原來的模式了(阻塞)
if (path_.empty()) {//如果是unix domain socket就設定快取地址
setCachedAddress(res->ai_addr, res->ai_addrlen);
}
}
上面這個函式程式碼確實比較長,不過還好都是比較簡單的程式碼實現,沒有什麼很繞的程式碼,整個流程也很清晰,在程式碼中也有比較詳細的註釋了。下面繼續看通用socket開啟函式local_open(它也真正的執行開啟功能也是呼叫上面剛才介紹的那個函式,只是傳遞了具體的地址資訊): void TSocket::local_open(){
#ifdef _WIN32
TWinsockSingleton::create();//相容window平臺
#endif // _WIN32
if (isOpen()) {//打開了就直接返回
return;
}
if (port_ < 0 || port_ > 0xFFFF) {//驗證埠是否為有效值
throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");
}
struct addrinfo hints, *res, *res0;
res = NULL;
res0 = NULL;
int error;
char port[sizeof("65535")];
std::memset(&hints, 0, sizeof(hints));//記憶體設定為0
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
sprintf(port, "%d", port_);
error = getaddrinfo(host_.c_str(), port, &hints, &res0);//根據主機名得到所有網絡卡地址資訊
// 迴圈遍歷所有的網絡卡地址資訊,直到有一個成功開啟
for (res = res0; res; res = res->ai_next) {
try {
openConnection(res);//呼叫開啟函式
break;//成功就退出迴圈
} catch (TTransportException& ttx) {
if (res->ai_next) {//異常處理,是否還有下一個地址,有就繼續
close();
} else {
close();
freeaddrinfo(res0); // 清除地址資訊記憶體和資源
throw;//丟擲異常
}
}
}
freeaddrinfo(res0);//釋放地址結構記憶體
}
整個local_open函式就是根據主機名得到所有的網絡卡資訊,然後依次嘗試開啟,直到開啟一個為止就退出迴圈,如果所有都不成功就丟擲一個異常資訊。讀函式read
在實現讀函式的時候需要注意區分返回錯誤為EAGAIN的情況,因為當超時和系統資源耗盡都會產生這個錯誤(沒有明顯的特徵可以區分它們),所以Thrift在實現的時候設定一個最大的嘗試次數,如果超過這個了這個次數就認為是系統資源耗盡了。下面具體看看read函式的實現,程式碼如下(省略一些引數檢查和錯誤處理的程式碼):
uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
int32_t retries = 0;//重試的次數
uint32_t eagainThresholdMicros = 0;
if (recvTimeout_) {//如果設定了接收超時時間,那麼計算最大時間間隔來判斷是否系統資源耗盡
eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
}
try_again:
struct timeval begin;
if (recvTimeout_ > 0) {
gettimeofday(&begin, NULL);//得到開始時間
} else {
begin.tv_sec = begin.tv_usec = 0;//預設為0,不需要時間來判斷是超時了
}
int got = recv(socket_, cast_sockopt(buf), len, 0);//從socket接收資料
int errno_copy = errno; //儲存錯誤程式碼
++g_socket_syscalls;//系統呼叫次數統計加1
if (got < 0) {//如果讀取錯誤
if (errno_copy == EAGAIN) {//是否為EAGAIN
if (recvTimeout_ == 0) {//如果沒有設定超時時間,那麼就是資源耗盡錯誤了!丟擲異常
throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)");
}
struct timeval end;
gettimeofday(&end, NULL);//得到結束時間,會改變errno,所以前面需要儲存就是這個原因
uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000)//計算消耗的時間
+ (((uint64_t)(end.tv_usec - begin.tv_usec))));
if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
if (retries++ < maxRecvRetries_) {//重試次數還小於最大重試次數
usleep(50);//睡眠50毫秒
goto try_again;//再次嘗試從socket讀取資料
} else {//否則就認為是資源不足了
throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)");
}
} else {//推測為超時了
throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (timed out)");
}
}
if (errno_copy == EINTR && retries++ < maxRecvRetries_) {//如果是中斷並且重試次數沒有超過
goto try_again;//那麼重試
}
#if defined __FreeBSD__ || defined __MACH__
if (errno_copy == ECONNRESET) {//FreeBSD和MACH特殊處理錯誤程式碼
return 0;
}
#endif
#ifdef _WIN32
if(errno_copy == WSAECONNRESET) {//win32平臺處理錯誤程式碼
return 0; // EOF
}
#endif
return got;
}
整個讀函式其實沒有什麼特別的,主要的任務就是錯誤情況的處理,從這裡可以看出其實實現一個功能是很容易的,但是要做到穩定和容錯性確實需要發很大功夫。寫函式write
寫函式和讀函式實現差不多,主要的程式碼還是在處理錯誤上面,還有一點不同的是寫函式寫的內容可能一次沒有傳送完畢,所以是在一個while迴圈中一直髮送直到指定的內容全部發送完畢。程式碼實現如下:
void TSocket::write(const uint8_t* buf, uint32_t len) {
uint32_t sent = 0;//記錄已經發送了的位元組數
while (sent < len) {//是否已經發送了指定的位元組長度
uint32_t b = write_partial(buf + sent, len - sent);//調部分寫入函式
if (b == 0) {//傳送超時過期了
throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");
}
sent += b;//已經發送的位元組數
}
}
上面的函式還沒有這種的呼叫send函式傳送寫入的內容,而是呼叫部分寫入函式write_partial寫入,這個函式實現如下:
uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
uint32_t sent = 0;
int flags = 0;
#ifdef MSG_NOSIGNAL
//使用這個代替SIGPIPE 錯誤,代替我們檢查返回EPIPE錯誤條件和關閉socket的情況
flags |= MSG_NOSIGNAL;//設定這個標誌位
#endif
int b = send(socket_, const_cast_sockopt(buf + sent), len - sent, flags);//傳送資料
++g_socket_syscalls;//系統呼叫計數加1
if (b < 0) { //錯誤處理
if (errno == EWOULDBLOCK || errno == EAGAIN) {
return 0;//應該阻塞錯誤直接返回
}
int errno_copy = errno;//儲存錯誤程式碼
if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) {
close();//連線錯誤關閉掉socket
}
}
return b;//返回寫入的位元組數
}
這個寫入的實現邏輯和過程也是非常簡單的,只是需要考慮到各種錯誤的情況並且相應的處理之。其他函式
TSocket類還有一些其他函式,不過功能都比較簡單,比如設定一些超時和得到一些成員變數值的函式,哪些函式一般都是幾句程式碼完成了。