1. 程式人生 > >muduo庫的原始碼分析1--整體架構

muduo庫的原始碼分析1--整體架構

最近,學習了陳碩大俠的《Linux多執行緒服務端程式設計:使用muduo C++網路庫》很受啟發。但是在學習muduo原始碼的過程中,還是感覺程式碼架構比較複雜,一個是和boost相關的內容比較多,對現代C++程式設計理念不瞭解的人,可能感覺莫名其妙。(關於什麼是現代C++,可參考此連結:https://msdn.microsoft.com/zh-cn/library/hh279654.aspx)。

另外,muduo的原始碼中,雖然不考慮可移植性,但還是劃分了很多小的類(Channel、Socket、TcpConnection、Acceptor,不知道是不是參考了java中的概念),類之間大量通過boost::bind()註冊回撥函式,感覺比繼承還要難理解。

但是無論如何,muduo所強調的關於現代C++程式設計技術和多執行緒服務端程式設計理念都是非常值得學習的。本文的主要目的有兩個:一是從整體架構上分析muduo的原始碼,讓希望瞭解它的人能夠快速入門。另外,在此基礎上實現一個簡化版的sim_muduo,讓更多的人可以此為基礎,更快捷的實踐現代C++程式設計和Linux併發網路程式設計相關的技術。

一、經典的伺服器設計模式Reactor模式

大多數人學習Linux網路程式設計的起點可能都是從《UNP》開始的,書中描述的服務端程式架構基本上是一個大的while迴圈,程式阻塞在accept或poll函式上,等待被監控的socket描述符上出現預期的事件。事件到達後,accept或poll函式的阻塞解除,程式向下執行,根據socket描述符上出現的事件,執行read、write或錯誤處理。
整體架構如下圖所示:
這裡寫圖片描述

muduo的軟體架構採用的也是Reactor模式,只是整個模式被分成多個類,並且支援以執行緒池的方式實現多執行緒併發處理,所以顯得有些複雜。整體架構如下圖所示:
這裡寫圖片描述

二、分析Muduo中幾個主要的類

muduo是一個支援多執行緒程式設計的網路庫,它封裝了和Linux執行緒、網路socket相關的十幾個API,支援客戶端和服務端程式設計。這裡先介紹和服務端程式設計程式設計相關的幾個類物件。

1、TcpServer、Acceptor和EventLoop

TcpServer物件一般執行在使用者程式碼的主執行緒,它的生命週期應該和使用者伺服器程式的生命週期一致。TcpServer物件基本上是使用者程式碼和Muduo庫之間的總介面。它對內管理多個成員物件、建立執行緒池、將新建連線分發不同執行緒處理,對外為使用者程式碼提供客戶端連線建立、訊息接收和傳送的介面。
這裡寫圖片描述


TcpServer中有三個主要的成員類,分別是:AcceptorEventLoopThreadPoolEventLoop*。其中:
Acceptor負責管理伺服器的監聽socket
EventLoopThreadPool用於建立和管理執行緒池
EventLoop*是一個指標,它指向一個使用者程式碼中建立的EventLoop物件,為TcpServer專用,相當於是為主執行緒提供的Loop迴圈。
這裡我一直不明白既然是給TcpServer專用,這個EventLoop物件為啥要在客戶程式碼中建立,然後將物件指標傳遞給TcpServer,而不是像Acceptor那樣放在TcpServer中自動建立)。

這裡寫圖片描述
在TcpServer的建構函式中,會自動建立並初始化Acceptor物件。其中,Acceptor物件的建構函式首先會建立一個用於伺服器程式的監聽socket描述符,併為其bind()伺服器側的IP地址和監聽埠。另外,Acceptor物件還提供一個封裝了listen() API的函式Acceptor::listen()。

void Acceptor::listen()
{
    acceptSocket_.listen(); // 此函式最終呼叫listen() API函式,啟動監聽

    // 此函式將監聽socket對應的Channel物件放入輪詢管理器poller的監控描述符集合中
    // 當監聽socket收到客戶端接入請求後,監聽socket對應的Channel物件(acceptChannel_)
    // 的handleEvent()函式,最終會呼叫到Acceptor::handleRead() (此函式將在稍後介紹)
    acceptChannel_.enableReading(); 
}

Acceptor::listen()函式在TcpServer::start()函式的最後一行被呼叫。
這裡寫圖片描述
當然,這裡說呼叫可能並不準確,因為loop_->runInLoop()函式會判斷:如果當前正在執行的執行緒就是loop_物件所屬的執行緒,則直接執行 Acceptor::listen()函式,否則 Acceptor::listen()函式被封裝成函式物件放入pendingFunctors_容器中,等待Loop_所屬的執行緒執行時再被執行。(這裡其實我也有點不理解,因為TcpServer中的loop_指向的就是使用者程式碼中建立的EventLoop物件,難道有可能使用者呼叫TcpServer::start()函式的執行緒與使用者建立EventLoop物件的執行緒不是同一個執行緒???
這裡寫圖片描述

講到這裡,我們可以看到傳統的伺服器初始化部分(建立socket、bind埠和IP、啟動監聽listen)基本上就完成了。另外,這裡還有兩個很重要的細節。首先Acceptor內部管理了一個Socket物件(acceptSocket_)和一個Channel(acceptChannel_)物件。Socket物件封裝了監聽socket描述符,它向下封裝了和socket相關的API介面。而Channel物件總是和Socket物件成對出現,它向上提供了一些回撥函式的註冊介面,這些回撥函式用於處理socket描述符上出現的各種狀態事件,例如:POLLIN、POLLOUT、POLLERR等等。(這裡,其實我也有些疑問,既然不考慮移植,為啥要區分Socket物件和Channel物件,把它們合併了不是更簡單)。瞭解了Socket和Channel的關係以後,後面在分析TcpConnection這個類的時候,我們會發現,每個TcpConnection用於表示一個客戶端的連線,所以TcpConnection類中也會有一對Socket和Channel成員。(我的想法是,Socket、Channel、TcpConnection完全可以合併成一個類,像現在這樣分成3個類,一堆回撥函式註冊來註冊去,沒想明白這樣做的好處是什麼???

再說第二個需要注意的細節:Acceptor還提供了一個函式void Acceptor::handleRead(),這個函式的主要工作有兩個

// 為了便於說明問題,這是簡化後的程式碼
// 此函式中還有一個防止系統描述符耗盡的小技巧,這裡沒有展示,感興趣的同學可參考陳大俠的書自行了解
void Acceptor::handleRead() 
{
    int connfd = acceptSocket_.accept(&peerAddr);
    newConnectionCallback_(connfd, peerAddr);
}

程式碼的第一行程式碼int connfd = acceptSocket_.accept(&peerAddr)最終會呼叫到系統API函式accept(),用於接收一個客戶端的連線請求。
第二行的newConnectionCallback_是一個回撥函式指標,它指向TcpServer::newConnection()函式,用於處理接收到的連線請求。

Acceptor::handleRead()函式本身在Acceptor的建構函式中會被註冊給acceptChannel_物件的readCallback_指標。當acceptSocket_(監聽socket)描述符發現客戶端的連線請求時,acceptChannel_物件的readCallback_就會被呼叫,即Acceptor::handleRead()函式被呼叫。

void Channel::handleEvent(Timestamp receiveTime)
{
    handleEventWithGuard(receiveTime);
}

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    {
        // acceptChannel物件的readCallback_指向Acceptor::handleRead()函式
        if (readCallback_) readCallback_(receiveTime);
    }
}

如果我前面的描述夠清晰的話,看到這裡,通過muduo庫,從建立服務端監聽socket到呼叫accept() API獲取客戶端連線請求的過程應該就比較清晰了。接下來需要分析的是newConnectionCallback_,也就是TcpServer::newConnection()函式如何建立並管理一個客戶端連線。新建一條Tcp連線的整體函式呼叫如下所示:

// 為了便於說明問題,這是簡化後的程式碼
void Acceptor::handleRead()
{
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr); // 這裡呼叫TcpServer::newConnection()函式
        }
    }
}

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
    // 從執行緒池中獲取一個EventLoop(因為EventLoop對應一個執行緒),這裡相當於獲取了一個執行緒
    EventLoop* ioLoop = threadPool_->getNextLoop();

    // 為當前接收到的連線請求建立一個TcpConnection物件,將TcpConnection物件與分配的(ioLoop)執行緒繫結
    TcpConnectionPtr conn(new TcpConnection(ioLoop,connName, sockfd, localAddr, peerAddr));

    // 將(使用者定義的)連線建立回撥函式、訊息接收回調函式註冊到TcpConnection物件中
    // 由此可知TcpConnection才是muduo庫對與網路連線的核心處理
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);

    // 對於新建連線的socket描述符,還需要設定期望監控的事件(POLLIN | POLLPRI),
    // 並且將此socket描述符放入poll函式的監控描述符集合中,用於等待接收客戶端從此連線上傳送來的訊息
    // 這些工作,都是由TcpConnection::connectEstablished函式完成。
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

// 為了便於說明問題,這是簡化後的程式碼
void TcpConnection::connectEstablished()
{
  channel_->enableReading(); // 將新建連線的Channel物件加入到POLLER輪詢管理器

  // 此函式對應TcpServer::connectionCallback_,最終指向一個使用者伺服器定義的回撥函式
  connectionCallback_(shared_from_this());
}

void Channel::enableReading() { events_ |= (POLLIN | POLLPRI); update(); }

void Channel::update(){ loop_->updateChannel(this);}

void EventLoop::updateChannel(Channel* channel){  poller_->updateChannel(channel); }

以上就是建立一條Tcp連線的大致流程,如果要刪除Tcp連線,函式的呼叫流程如下:

void TcpConnection::handleClose()
{
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr guardThis(shared_from_this());
    connectionCallback_(guardThis); // 回撥使用者定義的連線處理函式
    // must be the last line
    closeCallback_(guardThis); // closeCallback_對應的函式是TcpServer::removeConnection()
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
    size_t n = connections_.erase(conn->name());
    ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

void TcpConnection::connectDestroyed()
{
  if (state_ == kConnected)
  {
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this()); // 回撥使用者定義的連線處理函式
  }
  channel_->remove(); // 本連線對應的描述符不在需要監控,從poll中刪除。
}

前面詳細分析的TcpServer和Acceptor物件的結構和行為,並且大致瞭解了Socket、Channel和TcpConnection三者之間的關係,整個Muduo庫中與Reactor模式相關的軟體架構也就大致清楚了,最後再看看EventLoop這個類。muduo的對於併發處理採用的是one thread one loop方式,所以,每個執行緒都唯一對應一個EventLoop物件。
這裡寫圖片描述

EventLoop中包含了兩個比較重要的函式,首先是void EventLoop::loop(),它為每個工作執行緒提供了一個大的while迴圈,如下:

void EventLoop::loop()
{
    quit_ = false;
    while (!quit_)
    {
        // 輪詢,得到發生狀態變化的Channel物件集合
        poller_->poll(kPollTimeMs, &activeChannels_);

        // 遍歷每個發生狀態變化的Channel物件,執行物件的狀態處理函式
        for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
        {
            currentActiveChannel_ = *it;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }

        // 執行pendingFunctors_容器中由外部執行緒注入的函式物件
        doPendingFunctors();
    }
}

第二個重要的函式是void EventLoop::runInLoop(const Functor& cb),此函式的作用是為外部執行緒提供介面,將函式注入到EventLoop所屬的執行緒中執行。

void EventLoop::runInLoop(const Functor& cb)
{
    if (isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(cb);
    }
}

void EventLoop::queueInLoop(const Functor& cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(cb);
    }

    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}

到此為止,伺服器端建立並初始化監聽socket描述符、輪詢(poll())並獲取(accept())監聽socket描述符上的新建連線請求,為新建連線分配EventLoop工作執行緒的相關處理我們都已經分析了一遍。

接下來看看muduo庫中和執行緒池管理器相關的三個類:EventLoopThread、Thread和EventLoopThreadPool。除主執行緒外,muduo庫通過執行緒池管理器EventLoopThreadPool為每個執行緒建立一個EventLoopThread物件,每個EventLoopThread類中包含一個EventLoop物件指標(該物件建立線上程入口函式void EventLoopThread::threadFunc()的堆疊上,所以EventLoopThread中包含的EventLoop物件的生命週期應該和該執行緒相同)。另外,EventLoopThread類中還包含一個Thread類,它的作用主要是封裝了和執行緒相關的系統API(例如:pthread_create()、pthread_detach()),總的看,這幾個類的邏輯相對簡單,就不深入分析了。
這裡寫圖片描述