1. 程式人生 > >[evpp/muduo/reactor] evpp事件驅動網路庫 整體架構梳理

[evpp/muduo/reactor] evpp事件驅動網路庫 整體架構梳理

介紹

muduo很多人都聽說過,那evpp可以理解成是muduo用C++11改寫後的升級版。
相比muduo的程式碼風格,evpp會顯得更加現代一點,更討我們年輕人的喜歡。
作為例子,這裡是一段TCP Echo Server的示例程式碼:

    evpp::EventLoop loop;
    evpp::TCPServer server(&loop, "0.0.0.0:9099", "TCPEchoServer", thread_num);
    server.SetMessageCallback([](const evpp::TCPConnPtr& conn,
                                  evpp::Buffer* msg) {
        conn->Send(msg);
    });
    server
.SetConnectionCallback([](const evpp::TCPConnPtr& conn) { if (conn->IsConnected()) { LOG_INFO << "A new connection from " << conn->remote_addr(); } else { LOG_INFO << "Lost the connection from " << conn->remote_addr(); } }); server
.Init(); server.Start(); loop.Run();

evpp的核心程式碼其實不多,之前也已經花了一段時間去閱讀;
總的來說,程式碼本身還是十分通俗易懂的,不像某些天書C++;
相比muduo,evpp一個很明顯的變化就是,IO複用相關的抽象直接使用了libevent;
但是,看完之後,總是缺少一個系統性的認識,因此,想借此機會進行梳理;
此外,網上雖然有很多muduo的解析,但是介紹evpp的還是挺少的;
對於習慣C++11風格的人,應該算是多了一種選擇;
https://github.com/Qihoo360/evpp

核心類

這裡,我先羅列下evpp中的核心類

  • TCPServer
  • Listener
  • TCPClient
  • Connector
  • TCPConn
  • FdChannel
  • EventLoop
  • EventLoopThread
  • EventLoopThreadPool

只要是熟悉Socket程式設計的同學,看見這幾個類後應該還是有點親切感的。
其中,TcpClient/TcpServer分別抽象TCP客戶端和伺服器;Connector/Listener分別包裝TCP客戶端和伺服器的建立連線/接受連線;TcpConnection抽象一個TCP連線,無論是客戶端還是伺服器只要建立了網路連線就會使用TcpConnection;FdChannel是裝置fd的包裝,在muduo中主要包裝socket以及該socket對應的事件處理回撥函式;EventLoop是一個事件發生器,它驅動底層的IO複用器產生/發現事件,然後將事件派發到Channel處理;EventLoopThread是一個帶有EventLoop的執行緒;EventLoopThreadPool自然是一EventLoopThread的資源池,維護一堆EventLoopThread。

TCPServer類

這裡,我們以自頂向下的方式進行梳理。所以,我們先來看看TCPServer的資料成員。

    // TCPServer的名字,比如"Echo", "Discard"等
    const std::string name_; 

    // TCPServer最重要的一點就是,建立ListenFD然後Accept新連線;
    // 接下來的3項,給出了必要的資訊;
    // 第一項是“監聽地址”,值得提的一點是,evpp借鑑了Golang,可以直接使用字串指定IP地址;
    // 第二項是一個指標,指向Listener結構體;在evpp中,監聽埠建立新連線 相關的細節都被封裝在這個類之中;
    // 這個類對應至muduo中的acceptor類;另外一點就是,accepted的連線,在evpp中被抽象成TCPConn這個類,這個類我們稍後會提到;
    // 第三項就是事件驅動的引擎,可以理解成是對IO複用器的封裝;
    // ListenFD會註冊到該IO複用器上,當一個ListenFD上有新的連線時,該FD就會變成“可讀”,從而回調“可讀Callback”;
    // 當然,這個可讀Callback執行的是建立TCPConn的邏輯,而不是讀資料啥的;
    // 另外,這個loop_就是TCPServer_的主事件迴圈;
    const std::string listen_addr_; // ip:port
    std::unique_ptr<Listener> listener_;
    EventLoop* loop_;  // the listening loop

    // 一個EventLoop會繫結到一個特定的執行緒上執行;
    // 為了能夠更充分地利用多核心CPU,直觀的想法就是建立更多的執行緒(執行緒池),
    // 然後在每個執行緒上執行EventLoop;
    std::shared_ptr<EventLoopThreadPool> tpool_;

    ConnectionCallback conn_fn_;  // 連線建立/斷開時的回撥函式,由使用者註冊;
    MessageCallback msg_fn_;   // 當收到訊息時的回撥函式,由使用者註冊;
    DoneCallback stopped_cb_;

    // 一個TCPServer可以有多個TCP連線,下面的這兩項就反映了這樣一個客觀現實;
    // TCPServer會為每個TCP連線分配一個ID;與此同時,以該ID為key將TCPConn儲存起來;
    uint64_t next_conn_id_ = 0;
    std::map<uint64_t, TCPConnPtr> connections_;

看完資料成員,我們再來看看TCPServer的開放介面;

// 對於TCPServer的建構函式,當中無非是對我們剛剛提到的資料成員進行初始化;
// 但是,值得提的一點是Eventloop是由使用者分配的;
    TCPServer(EventLoop* loop,  
              const std::string& listen_addr/*ip:port*/,
              const std::string& name,
              uint32_t thread_num);

// 一個Server端正常的流程如下:建立套接字、繫結地址、設定成Listen狀態、呼叫Accept開始接受連結
// 在evpp/muduo的設計中,前三項被單獨放在了Init()初始化函式中,而真正呼叫Accept的操作被單獨放在了Start()當中;
    bool Init() {
        listener_.reset(new Listener(loop_, listen_addr_));
        listener_->Listen();
        status_.store(kInitialized);
    }

// 在呼叫前面提到的Accept之前,Start()當中為listener指定了回撥函式,
// 當有新的連線到來時,這個回撥函式就會被呼叫;
// 值得提醒的是,到目前為止,在這幾個初始化啟動函式中,我們並沒有發現與“業務”相關的邏輯;
// 一個TCP伺服器總有其業務邏輯,那麼業務邏輯在哪裡得到處理呢?
// 讓我們去看下TCPServer::HandleNewConn;
// (這裡再贅述下,HandleNewConn是TCPServer類中的函式,
// 但是通過listener_->SetNewConnectionCallback()的介面被註冊到listener_中,由listener_呼叫;
// 這種操作方式,在evpp/muduo中是十分普遍的;接下來,我們還會看到更多的這樣的例子;
// 所以,我們應該留神這個callback到底是哪裡來的;)
    bool Start() {
        tpool_->Start(true);
        listener_->SetNewConnectionCallback(...TCPServer::HandleNewConn...)
        listener_->Accept();
    }

在接著講HandleNewConn之前,我們先過一眼剩下的兩個開放(public)介面;
這兩個函式看似十分簡單,甚至是微不足道,但是它起到了 “封裝業務邏輯”的巨大作用;
使用者首先單獨編寫“業務處理函式”,然後以“回撥”的形式註冊到框架中,
實現“業務”和“框架”的解耦;
稍後,我們就會立馬看到它們的身影;

    void SetConnectionCallback(const ConnectionCallback& cb) { conn_fn_ = cb; }
    void SetMessageCallback(MessageCallback cb) { msg_fn_ = cb; }

void TCPServer::HandleNewConn(evpp_socket_t sockfd,
                              const std::string& remote_addr/*ip:port*/,
                              const struct sockaddr_in* raddr) {
// 接上文提到的,當Listener上有新的連結到來時,HandleNewConn函式會被回撥;
// 由於是事件驅動模型,一個TCP連結總是要對應到一個IO複用器(或者說Eventloop事件迴圈);
// 因此,程式碼中的第一步就是獲取一個Eventloop;
    EventLoop* io_loop = GetNextLoop(raddr);

// 在evpp/muduo中,TCP連線被抽象成TCPConn類;
// 既然現在有新連線到來,那我們new一個出來;
// 拋去一個TCP連線應該具備的屬性之外,在建構函式中我們還傳遞了io_loop;
    TCPConnPtr conn(new TCPConn(io_loop, ConnectionName, sockfd, listen_addr_, remote_addr, ++next_conn_id_));

// 在這裡,TCPServer使用者通過Setter函式設定的回撥函式,被進一步註冊到了底層的TCPConn當中;
// 這種回撥函式一層一層往下注冊的情況,接下來還會經常看到;
    conn->SetMessageCallback(msg_fn_);
    conn->SetConnectionCallback(conn_fn_);

// 既然TCPConn本身初始化完畢,同時上頭也指定了“當連線狀態變化”或“當有資料到達時”該如何處理,
// 那我們就把這個TCPConn Attach到事件迴圈當中;
// 至此,一個TCP連線就算是正式建立,可以服務客戶端了;
// 這裡劇透下,SockFD以及剛剛提到的“回撥函式”實際上又被封裝在了Channel這個類中;
// 這個類,我們稍後再進行解釋;
    io_loop->RunInLoop(std::bind(&TCPConn::OnAttachedToLoop, conn));

// 每個TCPConn被分配一個ID,以這種形式TCPServer便於對TCPConn進行集中管理;
    connections_[conn->id()] = conn;
}

TCPClient類

在熟悉了TCPServer之後,再去看對稱的TCPClient類就會覺得輕車熟路;

    evpp::EventLoop loop;
    evpp::TCPClient client(&loop, "127.0.0.1:9099", "TCPPingPongClient");
    client.SetMessageCallback([&loop, &client](const evpp::TCPConnPtr& conn,
                               evpp::Buffer* msg) {
        // 使用者業務邏輯
    });

    client.SetConnectionCallback([](const evpp::TCPConnPtr& conn) {
        // 使用者業務邏輯
    });

    client.Connect();

    loop.Run();

在使用者程式碼層面,二者唯一明顯的區別在於,listen/accept的封裝函式被替換成了Connect()函式;因此,讓我們先來探一探Connect();

// 還記得TCPServer中的Listener類嗎? Listener類封裝了監聽套接字、建立新連線相關的細節;
// 類似地,在TCPClient這邊也有一個Connector類,該類封裝了socket connect相關的細節;
// 當用戶呼叫client.Connect()時,將會發起非同步的建立連線操作; 
// 當TCP連線建立成功(或失敗)時,TCPClient::OnConnection函式將會被回撥;
void TCPClient::Connect() {
    auto f = [this]() {
        connector_.reset(new Connector(loop_, this));  // 對Connector中的資料成員進行必要的賦值;
        connector_->SetNewConnectionCallback(...TCPClient::OnConnection...);        
        connector_->Start();
    };
    loop_->RunInLoop(f);  // 在loop_所在的執行緒中呼叫函式f
}

// 當TCP連線建立成功或失敗時,將會回撥TCPClient::OnConnection
void TCPClient::OnConnection(evpp_socket_t sockfd, const std::string& laddr) {
    if (sockfd < 0) {  // 當TCP連線建立失敗時,通過回撥使用者設定的回撥函式conn_fn_,從而使得上層有機會對錯誤進行處理;`
        conn_fn_(TCPConnPtr(new TCPConn(loop_, "", sockfd, laddr, remote_addr_, 0)));
        return;
    }

    // 和TCPServer那邊的情況類似,使用者呼叫Setter函式設定的MessageCallback/ConnectionCallback,將會被進一步註冊到TCPConn中;
    TCPConnPtr c = TCPConnPtr(new TCPConn(loop_, name_, sockfd, laddr, remote_addr_, id++));
    c->set_type(TCPConn::kOutgoing);
    c->SetMessageCallback(msg_fn_); 
    c->SetConnectionCallback(conn_fn_);
    c->SetCloseCallback(...TCPClient::OnRemoveConnection...);

    {
        std::lock_guard<std::mutex> guard(mutex_);
        conn_ = c;
    }

    // 到此為止,一個TCP連線就完成了建立操作;
    // 這時候,我們可以把這個TCP連線新增到事件迴圈中,並回呼叫戶設定的conn_fn_函式;
    // 回撥這一操作,使得“連線建立成功”這個事件能夠被通知到上層使用者;
    c->OnAttachedToLoop();
}

持續更新Ing

參考文獻