1. 程式人生 > >《Linux多執行緒服務端程式設計》—muduo網路庫(1)

《Linux多執行緒服務端程式設計》—muduo網路庫(1)

TCP網路程式設計本質論

思維轉換:

把原來“主動呼叫recv(2)來接收資料,主動呼叫accept(2)來接受新連線,主動呼叫send(2)來發送資料”的思路轉換為“註冊一個收資料的回撥,網路庫收到資料會呼叫我,直接把資料提供給我,供我消費。註冊一個接受連線的回撥,網路庫接受了新連線會回撥我,直接把新連線物件傳給我,供我使用。需要傳送資料的時候,只管往連線中寫,網路庫會負責無阻塞地傳送。”

作者(陳碩)認為,TCP網路程式設計最本質的是處理三個半事件:

1.連線的建立,包括服務端接受(accept)新連線和客戶端成功發起(connect)連線。TCP連線一旦建立,客戶端和服務端是平等的,可以各自收發資料。

2.連線的斷開,包括主動斷開(close、shutdown)和被動斷開(read(2)返回0)。

3.訊息到達,檔案描述符可讀。這是最為重要的一個事件,對它的處理方式決定了網路程式設計的風格(阻塞還是非阻塞,如何處理分包,應用層的緩衝如何設計等等)。

3.5 訊息傳送完畢,這算半個。對於低流量的服務,可以不必關心這個事件;這裡的“傳送完畢”,是指將資料寫入作業系統的緩衝區,將由TCP協議棧負責資料的傳送與重傳,不代表對方已經收到資料。

Reactor模式

Reactor 是一種事件驅動機制。它和普通函式呼叫的不同之處在於:應用程式不是主動的呼叫某個API完成處理,而是恰恰相反,Reactor逆置了事件處理流程,應用程式需要提供相應的介面並註冊到Reactor上

,如果相應的時間發生,Reactor將主動呼叫應用程式註冊的介面,這些介面又稱為“回撥函式”

moduo庫Reactor模式的實現

muduo主要通過3個類來實現Reactor模式:EventLoop,Channel和Poller。

1. EventLoop

EventLoop是一個主控類,是一個事件發生器,它驅動Poller產生/發現事件,然後將事件派發到Channel處理。moduo的執行緒模型為 one loop per thread,即每個執行緒只能有一個 EventLoop 物件。EventLoop物件的生命週期通常和其所屬的執行緒一樣長。

主要資料成員:

    const
pid_t threadId_; //儲存當前EventLoop所屬執行緒id boost::scoped_ptr<Poller> poller_; //實現I/O複用 boost::scoped_ptr<TimerQueue> timerQueue_; int wakeupFd_; boost::scoped_ptr wakeupChannel_; //用於處理wakeupFd_上的可讀事件,將事件分發到handlRead() ChannelList activeChannels_; //有事件就緒的Channel Channel* currentActiveChannel_; MutexLock mutex_; //pendingFunctors_會暴露給其他執行緒,所以需要加鎖 std::vector<Functor> pendingFunctors_;

主要功能函式:

loop(),在該函式中會迴圈執行以下過程:
呼叫Poller::poll(),通過此呼叫獲得一個vector< channel* >activeChannels_的就緒事件集合,再遍歷該容器,執行每個Channel的 Channel::handleEvent() 完成相應就緒事件回撥,最後執行 pendingFunctors_ 排隊的函式。
上述一次迴圈就是一次Reactor模式完成。

runInLoop(boost::function< void() >),實現使用者指定任務回撥,若是EventLoop隸屬的執行緒呼叫 EventLoop::runInLoop() 則 EventLoop 馬上執行;若是其它執行緒呼叫則執行 EventLoop::queueInLoop(boost::function< void() > 將任務新增到佇列中(執行緒轉移)。
EventLoop如何知道有任務這件事呢——通過eventfd可以實現執行緒間通訊,具體做法是:
其它執行緒向 EventLoop::vector< boost::function< void() > >新增任務T,然後通過 EventLoop::wakeup() 向 eventfd 寫一個int,eventfd的回撥函式 EventLoop::handleRead() 讀取這個int,從而相當於 EventLoop 被喚醒,此時在loop中遍歷佇列,執行堆積的任務。這裡採用Channel管理eventfd,Poller偵聽eventfd,體現了eventfd可以統一事件源的優勢。

queueInLoop(Functor& cb),將cb放入佇列,並在必要時喚醒IO執行緒。有兩種情況需要喚醒IO執行緒:
1 呼叫 queueInLoop() 的執行緒不是IO執行緒;
2 呼叫 queueInLoop() 的執行緒是IO執行緒,而此時正在呼叫pengding functor。

2. Channel

Channel是事件分發器,是裝置fd的包裝(主要包裝socket)。每個Channel 只屬於一個 EventLoop (也就是隻屬於一個IO執行緒),每個Channel只負責一個檔案描述符fd的IO事件分發,但其不擁有fd。使用者一般不直接使用Channel,而會使用更上層的封裝,如TcpConnection。Channel的生命期由其owner class負責管理,它一般是其他class的直接或間接成員,fd由其owner擁有和關閉。

資料成員:

     int fd_; //檔案描述符

     int events_; //檔案描述符註冊事件

     int revents_; //檔案描述符的就緒事件,由Poller::poll設定

     // readCallback_,writeCallback...各種事件回撥,
     // 會在擁有該Channel類的建構函式中被註冊,
     // 例如TcpConnction會在建構函式中將TcpConnection::handlRead()註冊
     // 給Channel::readCallback

主要功能函式:

setCallback() 系列函式:接受Channel所屬的類註冊相應的事件回撥函式;

enableReading(),update():當一個fd想要註冊可讀事件時,首先通過

Channel::enableReading()->
Channel::update(this)->
EventLoop::updateChannel(Channel)->
Poller::updateChannel(Channel*)

呼叫鏈向poll系統呼叫的偵聽事件表註冊或者修改註冊事件。

handleEvent():事件分發器Channel的核心,由EventLoop::loop()呼叫,該函式呼叫 Channel::handleEventWithGuard(),在其內根據 Channel::revents 的值分發呼叫相應的事件回撥。

3. Poller

Poller是IO multiplexing的封裝,封裝了poll和epoll。Poller是EventLoop的間接成員,只供擁有該Poller的EventLoop在IO執行緒呼叫。生命期與EventLoop相等。

主要資料成員:

    vector pollfds_; //事件結構體陣列,用於poll的第一個引數;

    map<int,channel*> channels_; //用於檔案描述符fd到Channel的對映便於快速查詢到相應的Channel

主要功能函式:

updateChannel(Channel*) :用於將傳入的Channel關心的事件註冊給Poller。

poll(int timeoutMs,vector< channel* > activeChannels):其呼叫poll獲得當前活動的事件集合,將就緒事件所屬的Channel呼叫fillActiveChannels()加入到呼叫方傳入的 activeChannels_ 中。

Reactor模式的核心內容時序圖

這裡寫圖片描述

其他類簡介

EventLoopThread:啟動一個自己的執行緒並在其中執行一個EventLoop,其語義和”one loop per thread“相吻合。其關鍵的函式startLoop()會返回新執行緒中 EventLoop 物件的地址,因此需要用條件變數來等待執行緒的建立與執行(因為EventLoopThread物件已建立,所以startLoop()可以呼叫了,但是執行緒的建立和執行可能還沒完成,此時EventLoop 物件還沒構造完成,如果不等待則可能出錯)。執行緒主函式會在stack上定義EventLoop物件,然後將其地址賦值給loop_成員變數,最後notify()條件變數,喚醒startLoop()。
由於EventLoop的生命期與執行緒主函式的作用域相同,因此在threadFunc()退出之後,這個指標就失效了(好在服務程式一般不要求能安全地退出)。

TcpConnection:抽象一個TCP連線,無論是客戶端還是伺服器只要建立了網路連線就會使用TcpConnection;

TcpClient/TcpServer:分別抽象TCP客戶端和伺服器;

Connector/Acceptor:分別包裝TCP客戶端和伺服器的建立連線/接受連線;

Acceptor 接受新連線

Acceptor用於accept(2)新TCP連線,並通過回撥通知使用者,它是內部類,供TcpServer使用,生命期由後者控制。

Acceptor的資料成員包括Socket、Channel等,其中Socket是一個RAII handle,它是listening socket。Channel用於觀察此socket上的readable事件,並回調Acceptor::handleRead(),後者會呼叫accept(2)來接受新連線,並回呼叫戶callback。

TcpServer 新建TcpConnection

TcpServer新建連線的相關函式呼叫順序如下:

這裡寫圖片描述

其中Channel::handleEvent()的觸發條件是listening socket可讀,表示有新連線到達。TcpServer會為新連線建立相應的TcpConnection物件。

TcpServer class

TcpServer class的功能時管理 accept(2) 獲得的TcpConnection。TcpServer供使用者直接使用,生命期由使用者控制。

TcpServer內部使用Acceptor來獲得新連線的fd,它儲存使用者提供的ConnectionCallback 和 MessageCallback,在新建TcpConnection的時候會原樣傳給後者。

每個TcpConnection物件都有一個名字,是所屬TcpServer在建立TcpConnection物件時生成的,名字作為ConnectionMap的key。

在新連線到達時,Acceptor會回撥newConnection(),後者會建立TcpConnection物件conn,把它加入到ConnectionMap中,設定好callback,再呼叫 conn->connectEstablished(),其中會回撥使用者提供的ConnectionCalback。

TcpConnection class

TcpConnection 的生命週期由shared_ptr來管理,以防止訪問失效的物件或者發生網路串話(即舊的TCP連線斷開,新的TCP連線使用了同一個檔案描述符,原本發給前世的資訊誤發給今生了)。通過weak_ptr,我們就能知道socket連線在處理request期間是否已經關閉了。

TcpConnection 使用Channel來獲得socket上的IO事件,它會自己處理writable事件,而把readable事件通過MessageCallback傳達給客戶。

TcpConnection 擁有TCP socket,它的解構函式會 close(fd)。

TcpConnection 表示的是“一次TCP連線”,是不可再生的,一旦連線斷開,該物件就沒用了。另外,TcpConnection 沒用發起連線的功能,其建構函式的引數是已經建立好連線的socket fd。

Buffer

為什麼需要有output buffer:在write()呼叫中,作業系統可能不會接受所有應用程式的資料並一次性發送,而是可能分多次傳送,為了儘快返回event loop,而不是在那等待,TcpConnection需要有output buffer,將資料存放起來,在資料發完之前,持續關注POLLOUT事件。

有一點需要注意,TcpConnection關閉連線使用shutdown()而不是close(),因為如果對方已經發送了資料,而這些資料還在路上,此時如果呼叫close(),socket的讀寫都關閉,資料將會丟失。而shutdown()可以只關閉讀或者寫的其中一個(比如關閉寫方向的連線,而保留讀方向),稱為TCP的半關閉。這樣子我們就可以先關閉寫的一端,等接受完所有資料之後,再關閉讀的一端。

為什麼需要有input buffer:TCP是一個無邊界的位元組流協議,接收方必須要處理“收到的資料尚不構成一條完整的訊息”和“一次收到兩條訊息的資料”等情況。網路庫在處理socket可讀事件時,必須一次性把socket裡的資料讀完(從作業系統buffer搬到應用層buffer),不然會反覆觸發POLLIN事件(level trigger)。為應對“資料不完整”的情況,收到的資料先放到input buffer裡,等構成一條完整的訊息再通知程式的業務邏輯。

多執行緒TcpServer

EventLoopThreadPool

多執行緒TcpServer自己的EventLoop只用來接受新連線,而新連線會從event loop pool裡面挑選一個loop來執行IO。目前muduo使用最簡單的輪詢排程(Round-Robin Scheduling)演算法來選取pool中的EventLoop。

Connector

主動發起連線需要處理各種錯誤,以及考慮重試。我們把它封裝為Connector class,它只負責建立socket連線,不負責建立TcpConnection,它的NewConnectionCallback回撥的引數是socket檔案描述符。

實現的幾個難點:

  1. socket是一次性的,一旦出錯,無法恢復,只能關閉重來,但Connector是可以重用的,因此每次嘗試連線都要使用新的socket檔案描述符和新的Channel物件。要留意Channel物件的生命期管理,並防止socket檔案描述符洩露。
  2. EAGAIN是真錯誤,需要關閉socket再延期重試,EINPROGRESS是正在連線。即便出現socket可寫,也不一定意味著連線已成功建立,需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR,…)再次確認下。
  3. 重試的間隔應該逐漸延長,例如0.5s,1s,2s,4s,直至30s,即back-off。需要在Connector的解構函式中登出定時器。
  4. 要處理自連線。處理方法是斷開連線再重試。