muduo原始碼分析之實現TCP網路庫(連線的接收和關閉)
在EventLoop
、Channel
、Poller
三個類中完成對一般描述符、事件迴圈(poll)的封裝。實現了Reactor的基本功能,接下來則需要將網路套接字描述符、I/O函式、等進行封裝。
1.傳統的TcpServer |
在進行封裝之前需要明確我們需要封裝的內容有哪些?複習最簡單的TcpServer大概需要經歷如下步驟:
listenfd= socket(AF_INET,SOCK_STREAM,0);
strcut sockaddr_in servaddr;
bind(listenfd,(SA*)&servaddr,sizeof(servaddr));
listen(listenfd,LISTENQ);
struct pollfd fds[1024];
fds[0].fd=listenfd;
fds[0].events=POLLIN;
while(1)
{
ret=poll(fds,fds.size(),timeout);
//handle_event
if(fds[0].revents&POLLIN)
{
//new connection
connectedfd = accept(listenfd,clienAddr,sizeof(clienAddr));
//add connectedfd into POLL
}
}
這是一個比較典型的reactor TcpServer,poll呼叫的部分已經封裝,目前來看至少需要封裝的內容有:
1.網路套接字描述符fd
2.描述地址結構體strcut sockaddr_in
3.socket相關的系統呼叫
2.socket操作封裝 |
Endian.h
封裝了位元組序轉換函式(全域性函式,位於muduo::net::sockets名稱空間中)。
SocketsOps.h/ SocketsOps.cc
封裝了socket相關係統呼叫(全域性函式,位於muduo::net::sockets名稱空間中)。
Socket.h/Socket.cc(Socket類)
用RAII方法封裝socket file descriptor,包含操作:listen
、bind
、accept
這些操作將呼叫上述封裝的內容。
InetAddress.h/InetAddress.cc(InetAddress類)
網際地址sockaddr_in封裝
3.使用Acceptor類封裝監聽描述符 |
這個類用於處理accept
呼叫,事實上是對監聽套接字Listenfd
的封裝。
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie()),//create socket
acceptChannel_(loop, acceptSocket_.fd()),//Channel construct
listenning_(false)
{
acceptSocket_.setReuseAddr(true);//不用等待Time_Wait狀態結束
acceptSocket_.bindAddress(listenAddr);//bind
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));
}
從Acceptor的建構函式可以看出它包含了Socket物件,也就是說它本質上是一個描述符。
通過Channel型別的物件,設定了當Acceptor所管理的描述符可讀時,進行所執行的回撥handleRead
。
handleRead函式:
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr(0);
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);//accpet
if (connfd >= 0) {
if (newConnectionCallback_) {
newConnectionCallback_(connfd, peerAddr);//使用者回撥
} else {
sockets::close(connfd);
}
}
}
當listenfd可讀時,說明有新連線,這個時候呼叫accept建立已連線套接字,並且執行相應的使用者回撥。
到目前為止,完成了對監聽套接字的簡單封裝,這個封裝是不完全的,因為事實上,當有了TcpServer的概念後,監聽套接字對於使用者來說應該是不可見的。
不過在對TcpServer進行封裝之前,可以測試Acceptor的功能:
void newConnection(int sockfd, const muduo::InetAddress& peerAddr)
{
printf("newConnection(): accepted a new connection from %s\n",
peerAddr.toHostPort().c_str());
::write(sockfd, "How are you?\n", 13);
muduo::sockets::close(sockfd);
}
int main()
{
printf("main(): pid = %d\n", getpid());
muduo::InetAddress listenAddr(9981);
muduo::EventLoop loop;
muduo::Acceptor acceptor(&loop, listenAddr);
acceptor.setNewConnectionCallback(newConnection);
acceptor.listen();
loop.loop();
}
4.TcpServer接收新的連線 |
看了上述最後一個例子我們發現,監聽套接字的封裝暴露給了使用者。所以設計的TcpServer類就需要對Acceptor再封裝一層了。
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(listenAddr.toHostPort()),//
acceptor_(new Acceptor(loop, listenAddr)),//Accept的封裝
started_(false),
nextConnId_(1)//記錄連線數,當有新連線的時候會自增
{
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
}
我們看在之前的Acceptor中,只是在main中設定了使用者回撥,向已連線套接字write了一段文字,便close掉了,對已連線套接字並沒有處理。
現在在Acceptor的回撥中設定TcpServer::newConnection
。
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
//sockfd 是已連線套接字
loop_->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof buf, "#%d", nextConnId_);
++nextConnId_;//連線數+1
std::string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toHostPort();
//列印特定訊息
InetAddress localAddr(sockets::getLocalAddr(sockfd));
//TcpConnection類用來管理已連線套接字
TcpConnectionPtr conn(
new TcpConnection(loop_, //EventLoop
connName,//ConnectionName
sockfd, //accepted fd
localAddr,//
peerAddr));//
connections_[connName] = conn;//map
conn->setConnectionCallback(connectionCallback_);//設定使用者回撥
conn->setMessageCallback(messageCallback_);//設定使用者回撥
conn->connectEstablished();//在這個函式中,會執行onConnection的使用者回撥
}
在這個回撥中,建立了一個關鍵的物件,TcpConnection
,並用shared_ptr
管理,該Class用來管理已連線套接字。
TcpConnection::TcpConnection(EventLoop* loop,
const std::string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),//EventLoop
name_(nameArg),//
state_(kConnecting),
socket_(new Socket(sockfd)),//已連線套接字
channel_(new Channel(loop, sockfd)),//Channel
localAddr_(localAddr),//
peerAddr_(peerAddr)//
{
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this));//當已連線套接字可讀時
}
當已連線套接字可讀將回調TcpConnection::handleRead
void TcpConnection::handleRead()
{
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);//read
messageCallback_(shared_from_this(), buf, n);//onMessage使用者回撥
// FIXME: close connection if n == 0
}
busy loop事件
對於執行s05/test8.cc
它是一個discard服務,當客戶端關閉連線,將進入busy loop,原因在與當客戶端斷開連線時,客戶端傳送一個FIN段,服務端返回一個ACK,當服務端TCP收到FIN時,read就返回0(表示EOF),而此時這種狀態總是readable,將會不停觸發poll返回,就出現busy loop事件了。
5.TcpConnection關閉連線 |
關閉連線相對於建立連線要麻煩,因為需要考慮TcpConnection
的生命週期。
監聽套接字可讀事件是POLLIN; 已連線套接字正常可讀是POLLIN; 正常可寫是POLLOUT; 對等方close/shutdown關閉連線,已連線套接字可讀是POLLIN | POLLHUP;
在TcpConnection 建構函式中再新增:
// 連線關閉,回撥TcpConnection::handleClose
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
// 發生錯誤,回撥TcpConnection::handleError
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
在 TcpServer::newConnection() 中再新增:
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
.....
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1));
}
在TcpConnection::handleRead() 中再新增:
void TcpConnection::handleRead()
{
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
if (n > 0) {
messageCallback_(shared_from_this(), buf, n);
} else if (n == 0) {
handleClose();
} else {
handleError();
}
}
在來看看TcpConnection::handleClose()
函式
void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpConnection::handleClose state = " << state_;
assert(state_ == kConnected);
// we don't close fd, leave it to dtor, so we can find leaks easily.
channel_->disableAll();
// must be the last line
closeCallback_(shared_from_this());
}
回撥來回調去的有點暈,整理一下整個過程。
當已連線套接字發生可讀事件,poll返回,將呼叫呼叫Channel::handleEvent()
處理活動通道,呼叫TcpConnection::handleRead()
,::read() 返回0,進而呼叫TcpConnection::handleClose()
在handleClose()
函式中,呼叫TcpConnection
的closeCallback_,這個回撥函式是在TcpServer
裡面設定:
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1));
進而呼叫TcpServer::removeConnection
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnection [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
assert(n == 1); (void)n;
loop_->queueInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));
}
最後將呼叫TcpConnection::connectDestroy
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
assert(state_ == kConnected);
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());//回撥使用者onConnection Callback
loop_->removeChannel(get_pointer(channel_));//Poll不再關注此通道
}
6.參考 |