muduo原始碼分析:TcpServer類
上篇博文學習了Acceptor class 的實現,它僅僅是對Channel和Socket的簡單封裝,對使用者來說簡單易用。這得益於底層架構Reactor。接下來,開始學習muduo對於建立連線的處理。這屬於muduo提到的三個半事件中的第一個。可以想一下,TcpServer class應該也是對Acceptor,Poller的封裝。
連線處理過程
首先TcpServer通過Acceptor向Poller註冊了一個Channel,該Channel關注acceptSocket的readable事件,並設定了回撥函式Acceptor::newConnectionCallback
TcpServer::newConnection 。
acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2)); //繫結Acceptor::newConnectionCallback回撥函式
然後,當有client連線時,Poller返回該Channel,接著呼叫該Channel::handleEvent–>handleRead。在Acceptor中accept該連線,然後呼叫設定好的 Acceptor::newConnectionCallback
接著,對於每個連線,TcpServer會建立一個TcpConnnection來管理。TcpConnection是最為複雜的一個class,使用shared_ptr管理,因為它的生命週期比較模糊,這一點後面再分析。
最後,會呼叫TcpConnnection::connectEstablish
,它會回撥使用者設定好的回撥函式 connectionCallback。
(類與類之間通過回撥函式聯絡在了一起)
TcpServer.h
class TcpServer : boost::noncopyable { public: typedef boost::function<void(EventLoop*)> ThreadInitCallback; enum Option { kNoReusePort, kReusePort, }; //TcpServer(EventLoop* loop, const InetAddress& listenAddr); TcpServer(EventLoop* loop, //建構函式 const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); ~TcpServer(); // force out-line dtor, for scoped_ptr members. const string& ipPort() const { return ipPort_; } const string& name() const { return name_; } EventLoop* getLoop() const { return loop_; } void setThreadNum(int numThreads); //設定server中需要執行多少個Loop執行緒 void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = cb; } /// valid after calling start() boost::shared_ptr<EventLoopThreadPool> threadPool() { return threadPool_; } void start(); //啟動該TcpServer架構 void setConnectionCallback(const ConnectionCallback& cb) //設定新連接回調 { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) //設定訊息回撥 { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback& cb) //設定寫完成回撥 { writeCompleteCallback_ = cb; } private: void newConnection(int sockfd, const InetAddress& peerAddr); //被設定為Acceptor::newConnectionCallback()回撥函式 void removeConnection(const TcpConnectionPtr& conn); void removeConnectionInLoop(const TcpConnectionPtr& conn); typedef std::map<string, TcpConnectionPtr> ConnectionMap; //使用map關聯容器維護一個連線列表 EventLoop* loop_; // the acceptor loop const string ipPort_; //埠號 const string name_; //名字 boost::scoped_ptr<Acceptor> acceptor_; //用於接受連線的Acceptor boost::shared_ptr<EventLoopThreadPool> threadPool_; ConnectionCallback connectionCallback_; //新連接回調 MessageCallback messageCallback_; //訊息回撥 WriteCompleteCallback writeCompleteCallback_; //寫完成回撥 ThreadInitCallback threadInitCallback_; AtomicInt32 started_; //啟動標記 // always in loop thread int nextConnId_; //下一個連線ID ConnectionMap connections_; //連線列表 };
幾個重要成員:
boost::scoped_ptr<Acceptor> acceptor_; 這是上篇文章分析的用於接收連線的class,只在TcpServer內部使用,因此使用scoped_ptr管理
EventLoop* loop_; Reactor的關鍵class
map<string, TcpConnectionPtr> connections_; 管理TcpConnection的容器,確切的講應該是TcpServer通過shared_ptr管理TcpConnection(即TcpConnectionPtr),主要是因為TcpConnection擁有模糊的生命週期。muduo網路庫的使用這也會使用TcpConnectionPtr作為引數。每個連線有一個唯一的名字,在建立時生成。
TcpServer::TcpServer()
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2)); //繫結newConnectionCallback回撥函式
}
TcpServer::~TcpServer()
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
for (ConnectionMap::iterator it(connections_.begin());
it != connections_.end(); ++it)
{
TcpConnectionPtr conn(it->second);
it->second.reset();
conn->getLoop()->runInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));
}
}
TcpServer::newConnection
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) //新連線處理函式
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop(); //輪詢呼叫執行緒池的EventLoop迴圈
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); //生成唯一的name
++nextConnId_;i //++之後就是下一個連線id
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd)); //構造本地地址
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); //構造新的TcpConnection,將獲取的EventLoop的地址傳給新連線物件。
connections_[connName] = conn; //將該TcpConnection加入到TcpServer的map容器中
//設定TcpConnection三個半事件回撥函式,將使用者給TcpServer設定的回撥傳遞給TCPConnection
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
//呼叫conn->connectEstablished()
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}
TcpServer::removeConnection()
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name()); //將該TcpConnection從map容器中刪除
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));
}
使用示例
void onConnection(const muduo::net::TcpConnectionPtr& conn)
{
if(conn->connected()) {
std::cout << "New connection" << std::endl;
} else {
std::cout << "Connection failed" << std::endl;
}
}
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer *buffer)
//const char* data,
//ssize_t len)
{
const std::string readbuf = buffer->retrieveAllAsString();
std::cout << "Receive :" << readbuf.size()<< " bytes." << std::endl
<< "Content:" << readbuf << std::endl;
}
int main()
{
muduo::net::EventLoop loop;
muduo::net::TcpServer server(&loop, "8090");
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
可以看到TcpServer使用比較方便,只需要設定好相應的回撥函式,然後start()。
TcpServer在後臺默默地做了很多事情:socket、bind、listen、epoll_wait、accept等等。