[evpp/muduo/reactor] evpp事件驅動網路庫 整體架構梳理
介紹
muduo很多人都聽說過,那evpp可以理解成是muduo用C++11改寫後的升級版。
相比muduo的程式碼風格,evpp會顯得更加現代一點,更討我們年輕人的喜歡。
作為例子,這裡是一段TCP Echo Server的示例程式碼:
evpp::EventLoop loop;
evpp::TCPServer server(&loop, "0.0.0.0:9099", "TCPEchoServer", thread_num);
server.SetMessageCallback([](const evpp::TCPConnPtr& conn,
evpp::Buffer* msg) {
conn->Send(msg);
});
server .SetConnectionCallback([](const evpp::TCPConnPtr& conn) {
if (conn->IsConnected()) {
LOG_INFO << "A new connection from " << conn->remote_addr();
} else {
LOG_INFO << "Lost the connection from " << conn->remote_addr();
}
});
server .Init();
server.Start();
loop.Run();
evpp的核心程式碼其實不多,之前也已經花了一段時間去閱讀;
總的來說,程式碼本身還是十分通俗易懂的,不像某些天書C++;
相比muduo,evpp一個很明顯的變化就是,IO複用相關的抽象直接使用了libevent;
但是,看完之後,總是缺少一個系統性的認識,因此,想借此機會進行梳理;
此外,網上雖然有很多muduo的解析,但是介紹evpp的還是挺少的;
對於習慣C++11風格的人,應該算是多了一種選擇;
https://github.com/Qihoo360/evpp
核心類
這裡,我先羅列下evpp中的核心類
- TCPServer
- Listener
- TCPClient
- Connector
- TCPConn
- FdChannel
- EventLoop
- EventLoopThread
- EventLoopThreadPool
只要是熟悉Socket程式設計的同學,看見這幾個類後應該還是有點親切感的。
其中,TcpClient/TcpServer分別抽象TCP客戶端和伺服器;Connector/Listener分別包裝TCP客戶端和伺服器的建立連線/接受連線;TcpConnection抽象一個TCP連線,無論是客戶端還是伺服器只要建立了網路連線就會使用TcpConnection;FdChannel是裝置fd的包裝,在muduo中主要包裝socket以及該socket對應的事件處理回撥函式;EventLoop是一個事件發生器,它驅動底層的IO複用器產生/發現事件,然後將事件派發到Channel處理;EventLoopThread是一個帶有EventLoop的執行緒;EventLoopThreadPool自然是一EventLoopThread的資源池,維護一堆EventLoopThread。
TCPServer類
這裡,我們以自頂向下的方式進行梳理。所以,我們先來看看TCPServer的資料成員。
// TCPServer的名字,比如"Echo", "Discard"等
const std::string name_;
// TCPServer最重要的一點就是,建立ListenFD然後Accept新連線;
// 接下來的3項,給出了必要的資訊;
// 第一項是“監聽地址”,值得提的一點是,evpp借鑑了Golang,可以直接使用字串指定IP地址;
// 第二項是一個指標,指向Listener結構體;在evpp中,監聽埠建立新連線 相關的細節都被封裝在這個類之中;
// 這個類對應至muduo中的acceptor類;另外一點就是,accepted的連線,在evpp中被抽象成TCPConn這個類,這個類我們稍後會提到;
// 第三項就是事件驅動的引擎,可以理解成是對IO複用器的封裝;
// ListenFD會註冊到該IO複用器上,當一個ListenFD上有新的連線時,該FD就會變成“可讀”,從而回調“可讀Callback”;
// 當然,這個可讀Callback執行的是建立TCPConn的邏輯,而不是讀資料啥的;
// 另外,這個loop_就是TCPServer_的主事件迴圈;
const std::string listen_addr_; // ip:port
std::unique_ptr<Listener> listener_;
EventLoop* loop_; // the listening loop
// 一個EventLoop會繫結到一個特定的執行緒上執行;
// 為了能夠更充分地利用多核心CPU,直觀的想法就是建立更多的執行緒(執行緒池),
// 然後在每個執行緒上執行EventLoop;
std::shared_ptr<EventLoopThreadPool> tpool_;
ConnectionCallback conn_fn_; // 連線建立/斷開時的回撥函式,由使用者註冊;
MessageCallback msg_fn_; // 當收到訊息時的回撥函式,由使用者註冊;
DoneCallback stopped_cb_;
// 一個TCPServer可以有多個TCP連線,下面的這兩項就反映了這樣一個客觀現實;
// TCPServer會為每個TCP連線分配一個ID;與此同時,以該ID為key將TCPConn儲存起來;
uint64_t next_conn_id_ = 0;
std::map<uint64_t, TCPConnPtr> connections_;
看完資料成員,我們再來看看TCPServer的開放介面;
// 對於TCPServer的建構函式,當中無非是對我們剛剛提到的資料成員進行初始化;
// 但是,值得提的一點是Eventloop是由使用者分配的;
TCPServer(EventLoop* loop,
const std::string& listen_addr/*ip:port*/,
const std::string& name,
uint32_t thread_num);
// 一個Server端正常的流程如下:建立套接字、繫結地址、設定成Listen狀態、呼叫Accept開始接受連結
// 在evpp/muduo的設計中,前三項被單獨放在了Init()初始化函式中,而真正呼叫Accept的操作被單獨放在了Start()當中;
bool Init() {
listener_.reset(new Listener(loop_, listen_addr_));
listener_->Listen();
status_.store(kInitialized);
}
// 在呼叫前面提到的Accept之前,Start()當中為listener指定了回撥函式,
// 當有新的連線到來時,這個回撥函式就會被呼叫;
// 值得提醒的是,到目前為止,在這幾個初始化啟動函式中,我們並沒有發現與“業務”相關的邏輯;
// 一個TCP伺服器總有其業務邏輯,那麼業務邏輯在哪裡得到處理呢?
// 讓我們去看下TCPServer::HandleNewConn;
// (這裡再贅述下,HandleNewConn是TCPServer類中的函式,
// 但是通過listener_->SetNewConnectionCallback()的介面被註冊到listener_中,由listener_呼叫;
// 這種操作方式,在evpp/muduo中是十分普遍的;接下來,我們還會看到更多的這樣的例子;
// 所以,我們應該留神這個callback到底是哪裡來的;)
bool Start() {
tpool_->Start(true);
listener_->SetNewConnectionCallback(...TCPServer::HandleNewConn...)
listener_->Accept();
}
在接著講HandleNewConn之前,我們先過一眼剩下的兩個開放(public)介面;
這兩個函式看似十分簡單,甚至是微不足道,但是它起到了 “封裝業務邏輯”的巨大作用;
使用者首先單獨編寫“業務處理函式”,然後以“回撥”的形式註冊到框架中,
實現“業務”和“框架”的解耦;
稍後,我們就會立馬看到它們的身影;
void SetConnectionCallback(const ConnectionCallback& cb) { conn_fn_ = cb; }
void SetMessageCallback(MessageCallback cb) { msg_fn_ = cb; }
void TCPServer::HandleNewConn(evpp_socket_t sockfd,
const std::string& remote_addr/*ip:port*/,
const struct sockaddr_in* raddr) {
// 接上文提到的,當Listener上有新的連結到來時,HandleNewConn函式會被回撥;
// 由於是事件驅動模型,一個TCP連結總是要對應到一個IO複用器(或者說Eventloop事件迴圈);
// 因此,程式碼中的第一步就是獲取一個Eventloop;
EventLoop* io_loop = GetNextLoop(raddr);
// 在evpp/muduo中,TCP連線被抽象成TCPConn類;
// 既然現在有新連線到來,那我們new一個出來;
// 拋去一個TCP連線應該具備的屬性之外,在建構函式中我們還傳遞了io_loop;
TCPConnPtr conn(new TCPConn(io_loop, ConnectionName, sockfd, listen_addr_, remote_addr, ++next_conn_id_));
// 在這裡,TCPServer使用者通過Setter函式設定的回撥函式,被進一步註冊到了底層的TCPConn當中;
// 這種回撥函式一層一層往下注冊的情況,接下來還會經常看到;
conn->SetMessageCallback(msg_fn_);
conn->SetConnectionCallback(conn_fn_);
// 既然TCPConn本身初始化完畢,同時上頭也指定了“當連線狀態變化”或“當有資料到達時”該如何處理,
// 那我們就把這個TCPConn Attach到事件迴圈當中;
// 至此,一個TCP連線就算是正式建立,可以服務客戶端了;
// 這裡劇透下,SockFD以及剛剛提到的“回撥函式”實際上又被封裝在了Channel這個類中;
// 這個類,我們稍後再進行解釋;
io_loop->RunInLoop(std::bind(&TCPConn::OnAttachedToLoop, conn));
// 每個TCPConn被分配一個ID,以這種形式TCPServer便於對TCPConn進行集中管理;
connections_[conn->id()] = conn;
}
TCPClient類
在熟悉了TCPServer之後,再去看對稱的TCPClient類就會覺得輕車熟路;
evpp::EventLoop loop;
evpp::TCPClient client(&loop, "127.0.0.1:9099", "TCPPingPongClient");
client.SetMessageCallback([&loop, &client](const evpp::TCPConnPtr& conn,
evpp::Buffer* msg) {
// 使用者業務邏輯
});
client.SetConnectionCallback([](const evpp::TCPConnPtr& conn) {
// 使用者業務邏輯
});
client.Connect();
loop.Run();
在使用者程式碼層面,二者唯一明顯的區別在於,listen/accept的封裝函式被替換成了Connect()函式;因此,讓我們先來探一探Connect();
// 還記得TCPServer中的Listener類嗎? Listener類封裝了監聽套接字、建立新連線相關的細節;
// 類似地,在TCPClient這邊也有一個Connector類,該類封裝了socket connect相關的細節;
// 當用戶呼叫client.Connect()時,將會發起非同步的建立連線操作;
// 當TCP連線建立成功(或失敗)時,TCPClient::OnConnection函式將會被回撥;
void TCPClient::Connect() {
auto f = [this]() {
connector_.reset(new Connector(loop_, this)); // 對Connector中的資料成員進行必要的賦值;
connector_->SetNewConnectionCallback(...TCPClient::OnConnection...);
connector_->Start();
};
loop_->RunInLoop(f); // 在loop_所在的執行緒中呼叫函式f
}
// 當TCP連線建立成功或失敗時,將會回撥TCPClient::OnConnection
void TCPClient::OnConnection(evpp_socket_t sockfd, const std::string& laddr) {
if (sockfd < 0) { // 當TCP連線建立失敗時,通過回撥使用者設定的回撥函式conn_fn_,從而使得上層有機會對錯誤進行處理;`
conn_fn_(TCPConnPtr(new TCPConn(loop_, "", sockfd, laddr, remote_addr_, 0)));
return;
}
// 和TCPServer那邊的情況類似,使用者呼叫Setter函式設定的MessageCallback/ConnectionCallback,將會被進一步註冊到TCPConn中;
TCPConnPtr c = TCPConnPtr(new TCPConn(loop_, name_, sockfd, laddr, remote_addr_, id++));
c->set_type(TCPConn::kOutgoing);
c->SetMessageCallback(msg_fn_);
c->SetConnectionCallback(conn_fn_);
c->SetCloseCallback(...TCPClient::OnRemoveConnection...);
{
std::lock_guard<std::mutex> guard(mutex_);
conn_ = c;
}
// 到此為止,一個TCP連線就完成了建立操作;
// 這時候,我們可以把這個TCP連線新增到事件迴圈中,並回呼叫戶設定的conn_fn_函式;
// 回撥這一操作,使得“連線建立成功”這個事件能夠被通知到上層使用者;
c->OnAttachedToLoop();
}
持續更新Ing