muduo庫的原始碼分析1--整體架構
最近,學習了陳碩大俠的《Linux多執行緒服務端程式設計:使用muduo C++網路庫》很受啟發。但是在學習muduo原始碼的過程中,還是感覺程式碼架構比較複雜,一個是和boost相關的內容比較多,對現代C++程式設計理念不瞭解的人,可能感覺莫名其妙。(關於什麼是現代C++,可參考此連結:https://msdn.microsoft.com/zh-cn/library/hh279654.aspx)。
另外,muduo的原始碼中,雖然不考慮可移植性,但還是劃分了很多小的類(Channel、Socket、TcpConnection、Acceptor,不知道是不是參考了java中的概念),類之間大量通過boost::bind()註冊回撥函式,感覺比繼承還要難理解。
但是無論如何,muduo所強調的關於現代C++程式設計技術和多執行緒服務端程式設計理念都是非常值得學習的。本文的主要目的有兩個:一是從整體架構上分析muduo的原始碼,讓希望瞭解它的人能夠快速入門。另外,在此基礎上實現一個簡化版的sim_muduo,讓更多的人可以此為基礎,更快捷的實踐現代C++程式設計和Linux併發網路程式設計相關的技術。
一、經典的伺服器設計模式Reactor模式
大多數人學習Linux網路程式設計的起點可能都是從《UNP》開始的,書中描述的服務端程式架構基本上是一個大的while迴圈,程式阻塞在accept或poll函式上,等待被監控的socket描述符上出現預期的事件。事件到達後,accept或poll函式的阻塞解除,程式向下執行,根據socket描述符上出現的事件,執行read、write或錯誤處理。
整體架構如下圖所示:
muduo的軟體架構採用的也是Reactor模式,只是整個模式被分成多個類,並且支援以執行緒池的方式實現多執行緒併發處理,所以顯得有些複雜。整體架構如下圖所示:
二、分析Muduo中幾個主要的類
muduo是一個支援多執行緒程式設計的網路庫,它封裝了和Linux執行緒、網路socket相關的十幾個API,支援客戶端和服務端程式設計。這裡先介紹和服務端程式設計程式設計相關的幾個類物件。
1、TcpServer、Acceptor和EventLoop
TcpServer物件一般執行在使用者程式碼的主執行緒,它的生命週期應該和使用者伺服器程式的生命週期一致。TcpServer物件基本上是使用者程式碼和Muduo庫之間的總介面。它對內管理多個成員物件、建立執行緒池、將新建連線分發不同執行緒處理,對外為使用者程式碼提供客戶端連線建立、訊息接收和傳送的介面。
TcpServer中有三個主要的成員類,分別是:Acceptor,EventLoopThreadPool,EventLoop*。其中:
Acceptor負責管理伺服器的監聽socket;
EventLoopThreadPool用於建立和管理執行緒池;
EventLoop*是一個指標,它指向一個使用者程式碼中建立的EventLoop物件,為TcpServer專用,相當於是為主執行緒提供的Loop迴圈。
(這裡我一直不明白既然是給TcpServer專用,這個EventLoop物件為啥要在客戶程式碼中建立,然後將物件指標傳遞給TcpServer,而不是像Acceptor那樣放在TcpServer中自動建立)。
在TcpServer的建構函式中,會自動建立並初始化Acceptor物件。其中,Acceptor物件的建構函式首先會建立一個用於伺服器程式的監聽socket描述符,併為其bind()伺服器側的IP地址和監聽埠。另外,Acceptor物件還提供一個封裝了listen() API的函式Acceptor::listen()。
void Acceptor::listen()
{
acceptSocket_.listen(); // 此函式最終呼叫listen() API函式,啟動監聽
// 此函式將監聽socket對應的Channel物件放入輪詢管理器poller的監控描述符集合中
// 當監聽socket收到客戶端接入請求後,監聽socket對應的Channel物件(acceptChannel_)
// 的handleEvent()函式,最終會呼叫到Acceptor::handleRead() (此函式將在稍後介紹)
acceptChannel_.enableReading();
}
Acceptor::listen()函式在TcpServer::start()函式的最後一行被呼叫。
當然,這裡說呼叫可能並不準確,因為loop_->runInLoop()函式會判斷:如果當前正在執行的執行緒就是loop_物件所屬的執行緒,則直接執行 Acceptor::listen()函式,否則 Acceptor::listen()函式被封裝成函式物件放入pendingFunctors_容器中,等待Loop_所屬的執行緒執行時再被執行。(這裡其實我也有點不理解,因為TcpServer中的loop_指向的就是使用者程式碼中建立的EventLoop物件,難道有可能使用者呼叫TcpServer::start()函式的執行緒與使用者建立EventLoop物件的執行緒不是同一個執行緒???)
講到這裡,我們可以看到傳統的伺服器初始化部分(建立socket、bind埠和IP、啟動監聽listen)基本上就完成了。另外,這裡還有兩個很重要的細節。首先Acceptor內部管理了一個Socket物件(acceptSocket_)和一個Channel(acceptChannel_)物件。Socket物件封裝了監聽socket描述符,它向下封裝了和socket相關的API介面。而Channel物件總是和Socket物件成對出現,它向上提供了一些回撥函式的註冊介面,這些回撥函式用於處理socket描述符上出現的各種狀態事件,例如:POLLIN、POLLOUT、POLLERR等等。(這裡,其實我也有些疑問,既然不考慮移植,為啥要區分Socket物件和Channel物件,把它們合併了不是更簡單)。瞭解了Socket和Channel的關係以後,後面在分析TcpConnection這個類的時候,我們會發現,每個TcpConnection用於表示一個客戶端的連線,所以TcpConnection類中也會有一對Socket和Channel成員。(我的想法是,Socket、Channel、TcpConnection完全可以合併成一個類,像現在這樣分成3個類,一堆回撥函式註冊來註冊去,沒想明白這樣做的好處是什麼???)
再說第二個需要注意的細節:Acceptor還提供了一個函式void Acceptor::handleRead(),這個函式的主要工作有兩個
// 為了便於說明問題,這是簡化後的程式碼
// 此函式中還有一個防止系統描述符耗盡的小技巧,這裡沒有展示,感興趣的同學可參考陳大俠的書自行了解
void Acceptor::handleRead()
{
int connfd = acceptSocket_.accept(&peerAddr);
newConnectionCallback_(connfd, peerAddr);
}
程式碼的第一行程式碼int connfd = acceptSocket_.accept(&peerAddr)
最終會呼叫到系統API函式accept(),用於接收一個客戶端的連線請求。
第二行的newConnectionCallback_是一個回撥函式指標,它指向TcpServer::newConnection()函式,用於處理接收到的連線請求。
Acceptor::handleRead()函式本身在Acceptor的建構函式中會被註冊給acceptChannel_物件的readCallback_指標。當acceptSocket_(監聽socket)描述符發現客戶端的連線請求時,acceptChannel_物件的readCallback_就會被呼叫,即Acceptor::handleRead()函式被呼叫。
void Channel::handleEvent(Timestamp receiveTime)
{
handleEventWithGuard(receiveTime);
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
// acceptChannel物件的readCallback_指向Acceptor::handleRead()函式
if (readCallback_) readCallback_(receiveTime);
}
}
如果我前面的描述夠清晰的話,看到這裡,通過muduo庫,從建立服務端監聽socket到呼叫accept() API獲取客戶端連線請求的過程應該就比較清晰了。接下來需要分析的是newConnectionCallback_,也就是TcpServer::newConnection()函式如何建立並管理一個客戶端連線。新建一條Tcp連線的整體函式呼叫如下所示:
// 為了便於說明問題,這是簡化後的程式碼
void Acceptor::handleRead()
{
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); // 這裡呼叫TcpServer::newConnection()函式
}
}
}
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
// 從執行緒池中獲取一個EventLoop(因為EventLoop對應一個執行緒),這裡相當於獲取了一個執行緒
EventLoop* ioLoop = threadPool_->getNextLoop();
// 為當前接收到的連線請求建立一個TcpConnection物件,將TcpConnection物件與分配的(ioLoop)執行緒繫結
TcpConnectionPtr conn(new TcpConnection(ioLoop,connName, sockfd, localAddr, peerAddr));
// 將(使用者定義的)連線建立回撥函式、訊息接收回調函式註冊到TcpConnection物件中
// 由此可知TcpConnection才是muduo庫對與網路連線的核心處理
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
// 對於新建連線的socket描述符,還需要設定期望監控的事件(POLLIN | POLLPRI),
// 並且將此socket描述符放入poll函式的監控描述符集合中,用於等待接收客戶端從此連線上傳送來的訊息
// 這些工作,都是由TcpConnection::connectEstablished函式完成。
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
// 為了便於說明問題,這是簡化後的程式碼
void TcpConnection::connectEstablished()
{
channel_->enableReading(); // 將新建連線的Channel物件加入到POLLER輪詢管理器
// 此函式對應TcpServer::connectionCallback_,最終指向一個使用者伺服器定義的回撥函式
connectionCallback_(shared_from_this());
}
void Channel::enableReading() { events_ |= (POLLIN | POLLPRI); update(); }
void Channel::update(){ loop_->updateChannel(this);}
void EventLoop::updateChannel(Channel* channel){ poller_->updateChannel(channel); }
以上就是建立一條Tcp連線的大致流程,如果要刪除Tcp連線,函式的呼叫流程如下:
void TcpConnection::handleClose()
{
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis); // 回撥使用者定義的連線處理函式
// must be the last line
closeCallback_(guardThis); // closeCallback_對應的函式是TcpServer::removeConnection()
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
size_t n = connections_.erase(conn->name());
ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
void TcpConnection::connectDestroyed()
{
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this()); // 回撥使用者定義的連線處理函式
}
channel_->remove(); // 本連線對應的描述符不在需要監控,從poll中刪除。
}
前面詳細分析的TcpServer和Acceptor物件的結構和行為,並且大致瞭解了Socket、Channel和TcpConnection三者之間的關係,整個Muduo庫中與Reactor模式相關的軟體架構也就大致清楚了,最後再看看EventLoop這個類。muduo的對於併發處理採用的是one thread one loop方式,所以,每個執行緒都唯一對應一個EventLoop物件。
EventLoop中包含了兩個比較重要的函式,首先是void EventLoop::loop(),它為每個工作執行緒提供了一個大的while迴圈,如下:
void EventLoop::loop()
{
quit_ = false;
while (!quit_)
{
// 輪詢,得到發生狀態變化的Channel物件集合
poller_->poll(kPollTimeMs, &activeChannels_);
// 遍歷每個發生狀態變化的Channel物件,執行物件的狀態處理函式
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
// 執行pendingFunctors_容器中由外部執行緒注入的函式物件
doPendingFunctors();
}
}
第二個重要的函式是void EventLoop::runInLoop(const Functor& cb),此函式的作用是為外部執行緒提供介面,將函式注入到EventLoop所屬的執行緒中執行。
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
到此為止,伺服器端建立並初始化監聽socket描述符、輪詢(poll())並獲取(accept())監聽socket描述符上的新建連線請求,為新建連線分配EventLoop工作執行緒的相關處理我們都已經分析了一遍。
接下來看看muduo庫中和執行緒池管理器相關的三個類:EventLoopThread、Thread和EventLoopThreadPool。除主執行緒外,muduo庫通過執行緒池管理器EventLoopThreadPool為每個執行緒建立一個EventLoopThread物件,每個EventLoopThread類中包含一個EventLoop物件指標(該物件建立線上程入口函式void EventLoopThread::threadFunc()
的堆疊上,所以EventLoopThread中包含的EventLoop物件的生命週期應該和該執行緒相同)。另外,EventLoopThread類中還包含一個Thread類,它的作用主要是封裝了和執行緒相關的系統API(例如:pthread_create()、pthread_detach()),總的看,這幾個類的邏輯相對簡單,就不深入分析了。