muduo原始碼解析29-網路庫7:tcpconnection類
tcpconnection類:
說明:
之前我們提到了acceptor類負責socket(),bind(),listen()和accept()一個連線。
但是accept()之後的操作,例如和這個連線套接字的讀/寫,關閉連線等操作都沒有實現。
這些操作都將由tcpconnection這個類來實現。
tcpconnection是整個網路庫的核心,封裝一次TCP連線,注意它不能發起連線
tcpserver和tcpclient都用到了tcpconnection
tcpconnection是為tcpserver服務的
tcpserver(待會講到)中會有一個acceptor負責接收新的連線,acceptor每接受一個連線時就會為這個連線建立一個tcpconnection。
acceptor在建立tcpconnection時為其為指定四個回撥函式,讀訊息,寫完成,連線建立,關閉連線。
被呼叫處:handleRead();handleWrite()/sendInLoop();connectionEstablished();handleClose();
tcpconnection內部也會有一個socket和一個對應的channel,需要在channel上註冊讀/寫/關閉/錯誤網路事件,一旦發生這寫網路事件,tcpconnection就會去執行相應的回撥函式:
讀網路事件:channel回撥tcpconnection::handleRead()把資料讀到m_inputbuffer中
寫網路事件:channel回撥tcpconnection::handleWrite()把資料通過m_outputbuffer進行處理(見程式碼)
關閉網路事件:tcpconnection::handleClose()
錯誤網路事件:tcpconnection::handleError()
一個tcpconenction有四種狀態:已連線,未連線,正在連線,正在斷開
內部有一個socket和channel,
tcpconenction構造時會設定channel的四個回撥函式,read/write/close/error
只要當socket上發生網路事件時.poller::poll()就會返回,channel會被新增到
呼叫上面四個回撥函式來處理
內部有兩塊之前的buffer,主要是為了解決高併發情況下讀寫緩衝區的問題,
inputbuffer:解決包不完整,粘包問題
先把資料全部寫到inputbuffer中,之後再進行處理,只當有一個完整的包時,才進行業務處理
outputbuffer:解決當前核心寫阻塞的問題
先把資料全部寫到outputbuffer中,每當核心可寫時,才把outputbuffer中資料寫出去
tcpconnection基本上都是通過evenntloop::runInLoop()來實現對於tcp連線的管理的,
例如send,shutdown,forceclose等操作,下面以分別說一說具體的步驟:
先看一看send()函式
tcpconnection通過呼叫send傳送資料,實際上通過eventloop::runInLoop()把傳送任務
新增到eventloop中的任務佇列中,eventloop會回撥tcpconnection::sendInLoop()完成
傳送資料的操作.sendInLoop()先嚐試write(),如果無法一次全部寫完,需要把資料快取到
outputbuffer中,讓channel註冊寫網路事件,等到核心可寫時,socket就會出發可寫
網路事件,channel就回調handleWrite(),把outputbuffer中的資料傳送出去
只要資料全部寫完就會觸發m_writeCompleteCallback寫完成回撥.
再看一看forceclose()函式:
tcpconenction的關閉有兩種情況,一種是遠方斷開連線,此時socket觸發close網路事件
channel::handleEvent()會回撥&tcpconnection::handleClose()處理
第二種情況是我自己主動呼叫forceclose()關閉連線,此時會把
tcpconnection::forceCloseInLoop新增到eventloop中的任務佇列中,
讓eventloop再去主動呼叫channel::handleClose()關閉連線
可以看出來,兩種方法最終都是讓channel執行handleClose()關閉tcp連線
再來看一看startread()函式:
startread()實質實在channel上註冊一個可讀網路事件,只要socket發生可讀網路事件
channel就會呼叫handleEvent(),然後回撥tcpconnection::handleRead()來把資料讀
到inputbuffer中,不直接使用read得到的資料而先讀到inputbuffer的好處之前說過了,防止
粘包/包不完整,在inputbuffer中的資料還會被進一步處理才能使用
tcpconnection.h
#ifndef TCPCONNECTION_H #define TCPCONNECTION_H #include"base/noncopyable.h" #include"base/types.h" #include"net/callbacks.h" #include"net/buffer.h" #include"net/inetaddress.h" #include<memory> #include<boost/any.hpp> struct tcp_info; namespace mymuduo{ namespace net { class channel; class eventloop; class Socket; class tcpconnection:noncopyable,public std::enable_shared_from_this<tcpconnection> { public: //建構函式,初始化成員,設定四個回撥函式,Read,Write,Close,ErrorCallback tcpconnection(eventloop* loop,const string& name,int sockfd, const inetaddress& localAddr,const inetaddress& peerAddr); //解構函式,列印一下日誌啥也不幹 ~tcpconnection(); //獲取成員資訊:eventloop,name,server/client地址,連線是否建立 eventloop* getLoop() const{return m_loop;} const string& name() const{return m_name;} const inetaddress& localAddress() const {return m_localAddr;} const inetaddress& peerAddress() const {return m_peerAddr;} bool connected() const{return m_state==kConnected;} bool disconnected() const{return m_state==kDisconnected;} //內部利用Socket::getTcpInfo()獲取tcp資訊 bool getTcpInfo(struct tcp_info*) const; //內部利用Socket::getTcpInfoString()格式化tcp資訊 string getTcpInfoString() const; void send(const string& message); void send(const void* message,int len); void send(buffer* message); //關閉tcp連線,並讓eventloop回撥&TcpConnection::shutdownInLoop() void shutdown(); //讓eventloop強制關閉tcp連線,利用eventloop的定時器佇列實現(喚醒poller::poll()來關閉),eventloop //回撥tcpconnection::forceCloseInLoop() void forceClose(); //強制在seconds秒後關閉tcp連線 void forceCloseWithDelay(double seconds); //sockets::setTcpNoDelay() void setTcpNoDelay(bool on); //eventloop回撥tcpconenction::startReadInLoop() void startRead(); //eventloop回撥tcpconnection::stopReadInLoop() void stopRead(); bool isReading() const{return m_reading;} void setContext(const boost::any& context){m_context=context;} const boost::any& getContext()const{return m_context;} boost::any* getMutableContext(){return &m_context;} //設定四個回撥函式 void setConnectionCallback(const ConnectionCallback& cb){ m_connectionCallback = cb; } void setMessageCallback(const MessageCallback& cb){ m_messageCallback = cb; } void setWriteCompleteCallback(const WriteCompleteCallback& cb) { m_writeCompleteCallback = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { m_highWaterMarkCallback = cb; m_highWaterMark = highWaterMark; } //獲取讀/寫緩衝區 buffer* inputBuffer(){return &m_inputbuffer;} buffer* outputBuffer(){return &m_outputbuffer;} //僅作為內部使用 void setCloseCallback(const CloseCallback& cb){m_closeCallback=cb;} void connectEstablished(); void connectDestroyed(); private: //tcpconnection 的四種狀態,連線未建立,正在建立,已建立,正在斷開 enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting}; //處理讀/寫網路事件,供channel回撥 void handleRead(timestamp receiveTime); void handleWrite(); //處理關閉連線的網路事件,讓channel不再關心任何網路事件. void handleClose(); //處理錯誤事件,利用sockets::getSocketError()來實現 void handleError(); void sendInLoop(const string& message); void sendInLoop(const void* message,size_t len); //關閉tcpconenction void shutdownInLoop(); //供eventloop回撥使用,用來強制關閉tcp連線 void forceCloseInLoop(); void setState(StateE s){m_state=s;} //把tcp連線狀態轉換成字串形式 const char* stateToString() const; //讓channel註冊讀網路事件 void startReadInLoop(); //讓channel不再關心讀網路事件 void stopReadInLoop(); eventloop* m_loop; //所在的eventloop const string m_name; //name StateE m_state; //連線狀態,有上面四種 bool m_reading; //是否正在讀 std::unique_ptr<Socket> m_socket; //server Socket* std::unique_ptr<channel> m_channel; //socket對應的channel const inetaddress m_localAddr; //server地址 const inetaddress m_peerAddr; //client地址 ConnectionCallback m_connectionCallback; //連線成功的回撥函式 MessageCallback m_messageCallback; //讀事件成功的回撥函式 WriteCompleteCallback m_writeCompleteCallback; //寫事件完成的回撥函式 HighWaterMarkCallback m_highWaterMarkCallback; //...? CloseCallback m_closeCallback; //連線關閉時的回撥函式 size_t m_highWaterMark; //...? buffer m_inputbuffer; //TCP讀緩衝區 buffer m_outputbuffer; //TCP寫緩衝區 boost::any m_context; //...? }; typedef std::shared_ptr<tcpconnection> TcpConnectionPtr; }//namespace net }//namespace mymuduo #endif // TCPCONNECTION_H
tcpconnection.cpp
#include "tcpconnection.h" #include"base/logging.h" #include"net/channel.h" #include"base/weakcallback.h" #include"net/eventloop.h" #include"net/socket.h" #include"net/socketsops.h" #include<errno.h> namespace mymuduo { namespace net { //"net/callbacks.h"中defaultConnectionCallback和defaultMessageCallback的定義 void defaultConnectionCallback(const TcpConnectionPtr& conn) { LOG_TRACE << conn->localAddress().toIpPort() << " -> " << conn->peerAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); // do not call conn->forceClose(), because some users want to register message callback only. } void defaultMessageCallback(const TcpConnectionPtr&, buffer* buf, timestamp) { buf->retrieveAll(); } //建構函式,初始化成員,設定四個回撥函式,Read,Write,Close,ErrorCallback tcpconnection::tcpconnection(eventloop* loop,const string& name,int sockfd, const inetaddress& localAddr,const inetaddress& peerAddr) :m_loop(loop),m_name(name),m_state(kConnecting),m_reading(true), m_socket(new Socket(sockfd)), m_channel(new channel(loop,sockfd)), m_localAddr(localAddr), m_peerAddr(peerAddr) { //設定channel::handleEvent()時的四個回撥函式,讀/寫/關閉/錯誤 m_channel->setReadCallback(std::bind(&tcpconnection::handleRead,this,_1)); m_channel->setWriteCallback(std::bind(&tcpconnection::handleWrite,this)); m_channel->setCloseCallback(std::bind(&tcpconnection::handleClose,this)); m_channel->setErrorCallback(std::bind(&tcpconnection::handleError,this)); LOG_DEBUG << "TcpConnection::ctor[" << m_name << "] at " << this << " fd=" << sockfd; m_socket->setKeepAlive(true);//設定是否開啟keepalive判斷通訊方是否存活 } //解構函式,列印一下日誌啥也不幹 tcpconnection::~tcpconnection() { LOG_DEBUG << "TcpConnection::dtor[" << m_name << "] at " << this << " fd=" << m_channel->fd() << " state=" << stateToString(); assert(m_state == kDisconnected); } //內部利用Socket::getTcpInfo()獲取tcp資訊 bool tcpconnection::getTcpInfo(struct tcp_info* tcpi) const { return m_socket->getTcpInfo(tcpi); } //內部利用Socket::getTcpInfoString()格式化tcp資訊 string tcpconnection::getTcpInfoString() const { char buf[102]={0}; buf[0]='\0'; m_socket->getTcpInfoString(buf,sizeof buf); return buf; } //傳送資料,內部讓eventloop回撥tcpconnection::sendInLoop()來實現傳送資料 void tcpconnection::send(const string& message) { if(m_state==kConnected) { //讓eventloop回撥tcpconnection::sendInLoop完成傳送資料操作 if(m_loop->isInLoopThread()) sendInLoop(message); else { //函式指標fp指向sendInLoop; void (tcpconnection::*fp)(const string& message) = &tcpconnection::sendInLoop; m_loop->runInLoop(std::bind(fp,this,message.data())); } } } void tcpconnection::send(const void* message,int len) { send(string(static_cast<const char*>(message),len)); } void tcpconnection::send(buffer* buf) { if(m_state==kConnected) { if(m_loop->isInLoopThread()) { // sendInLoop(buf->peek(),buf->readableBytes()); buf->retrieveAll(); } else { void (tcpconnection::*fp)(const string& message)=&tcpconnection::sendInLoop; m_loop->runInLoop( std::bind(fp,this,buf->retrieveAllAsString())); } } } //關閉tcp連線,並讓eventloop回撥&TcpConnection::shutdownInLoop() void tcpconnection::shutdown() { //只有已經建立了連線才可以關閉連線 if(m_state==kConnected) { setState(kDisconnected); //讓eventloop回撥tcpconnection::shutdownInLoop m_loop->runInLoop(std::bind(&tcpconnection::shutdownInLoop,this)); } } //讓eventloop強制關閉tcp連線,利用eventloop的定時器佇列實現(喚醒poller::poll()來關閉),eventloop //回撥tcpconnection::forceCloseInLoop() void tcpconnection::forceClose() { if(m_state==kConnected || m_state==kDisconnecting) { setState(kDisconnecting); //讓eventloop回撥tcpconnection::forceCloseInLoop m_loop->queueInLoop(std::bind(&tcpconnection::forceCloseInLoop,shared_from_this())); } } //強制在seconds秒後關閉tcp連線 void tcpconnection::forceCloseWithDelay(double seconds) { if(m_state==kConnected || m_state==kDisconnecting) { setState(kDisconnecting); m_loop->runAfter(seconds, makeWeakCallback(shared_from_this(),&tcpconnection::forceClose)); } } //sockets::setTcpNoDelay() void tcpconnection::setTcpNoDelay(bool on) { m_socket->setTcpNoDelay(on); } //eventloop回撥tcpconenction::startReadInLoop() void tcpconnection::startRead() { m_loop->runInLoop(std::bind(&tcpconnection::startReadInLoop,this)); } //eventloop回撥tcpconnection::stopReadInLoop() void tcpconnection::stopRead() { m_loop->runInLoop(std::bind(&tcpconnection::stopReadInLoop,this)); } //僅作為內部使用 void tcpconnection::connectEstablished() { m_loop->assertInLoopThread(); assert(m_state==kConnecting); setState(kConnected); m_channel->tie(shared_from_this()); m_channel->enableReading(); m_connectionCallback(shared_from_this()); } void tcpconnection::connectDestroyed() { m_loop->assertInLoopThread(); if(m_state==kConnected) { setState(kDisconnected); m_channel->disableAll(); m_connectionCallback(shared_from_this()); } m_channel->remove(); } //處理讀網路事件,供channel回撥,把資料讀到 m_inputbuffer中 void tcpconnection::handleRead(timestamp receiveTime) { m_loop->assertInLoopThread(); int saveErrno=0; //呼叫sockets::readv()把資料讀到m_inputbuffer中 ssize_t n=m_inputbuffer.readFd(m_channel->fd(),&saveErrno); //讀到了資料呼叫m_messageCallback()回撥函式 if(n>0) m_messageCallback(shared_from_this(),&m_inputbuffer,receiveTime); else if(n==0) handleClose(); else { errno=saveErrno; LOG_SYSERR<<"TcpConnection::handleRead"; handleError(); } } //處理寫網路事件,把m_outputbuffer中的資料寫到m_channel->fd()上,即傳送資料 void tcpconnection::handleWrite() { m_loop->assertInLoopThread(); if(m_channel->isWriting()) { ssize_t n=sockets::write(m_channel->fd(),m_outputbuffer.peek(), m_outputbuffer.readableBytes()); if(n>0) //成功寫入的位元組數 { m_outputbuffer.retrieve(n);//移動寫緩衝區的m_writerIndex //寫緩衝區中所有資料都發完了,呼叫寫事件完成回撥函式 if(m_outputbuffer.readableBytes()==0) m_loop->queueInLoop(std::bind(m_writeCompleteCallback,shared_from_this())); if(m_state==kDisconnecting) shutdownInLoop(); } else { LOG_SYSERR<<"TcpConnection::handleWrite"; } }else { LOG_TRACE<<"Connection fd = "<<m_channel->fd()<<" is down, no more writing"; } } //處理關閉連線的網路事件,讓channel不再關心任何網路事件. //主動呼叫tcpconnection::forceClose()或者遠方斷開連線都會觸發這個回撥 void tcpconnection::handleClose() { m_loop->assertInLoopThread(); LOG_TRACE << "fd = " << m_channel->fd() << " state = " << stateToString(); assert(m_state==kConnected || m_state==kDisconnecting); setState(kDisconnected); m_channel->disableAll(); //取關所有網路事件 TcpConnectionPtr guardThis(shared_from_this()); m_connectionCallback(guardThis); m_closeCallback(guardThis); } //處理錯誤事件,利用sockets::getSocketError()來實現 void tcpconnection::handleError() { int err = sockets::getSocketError(m_channel->fd()); LOG_ERROR << "TcpConnection::handleError [" << m_name << "] - SO_ERROR = " << err << " " << strerror_tl(err); } void tcpconnection::sendInLoop(const string& message) { sendInLoop(message.data(),message.size()); } //在eventloop中回撥這個sendInLoop(),用於傳送message資料 //內部通過sockets::write()實現傳送資料,先發送一次,如果沒全部發送完,把資料先 //放到outputbuffer中,讓channel註冊寫網路事件,等到核心可寫時,socket就會出發可寫 //網路事件,channel就回調handleWrite(),把outputbuffer中的資料傳送出去 void tcpconnection::sendInLoop(const void* data,size_t len) { m_loop->assertInLoopThread(); ssize_t nwrote=0; //已經發了多少位元組 size_t remaining =len; //還剩下多少位元組沒傳送 bool faultError=false; //錯誤 if(m_state==kDisconnected) //如果斷開了連線 { LOG_WARN<<"disconnected,give up writing"; return; } //if no thing in output queue,try writing directly if(!m_channel->isWriting() && m_outputbuffer.readableBytes()==0) { nwrote=sockets::write(m_channel->fd(),data,len); if(nwrote>=0) { //成功發了nwrote位元組 remaining=len-nwrote; //如果全部發完了,讓eventloop回撥寫成功m_writeCompleteCallback函式 if(remaining==0 && m_writeCompleteCallback) m_loop->queueInLoop(std::bind(m_writeCompleteCallback,shared_from_this())); } else { //寫失敗 nwrote=0; if(errno!=EWOULDBLOCK) { LOG_SYSERR<<"TcpConnection::sendInLoop"; if(errno==EPIPE || errno ==ECONNRESET) faultError=true; } } } assert(remaining<=len); if(!faultError && remaining>0) { //還剩下多少位元組沒寫出去 size_t oldLen=m_outputbuffer.readableBytes(); //發生特定條件,讓eventloop回撥m_highWaterMarkCallback if(oldLen+remaining>=m_highWaterMark && oldLen<m_highWaterMark && m_highWaterMarkCallback) { m_loop->runInLoop(std::bind(m_highWaterMarkCallback,shared_from_this(), oldLen+remaining)); } //讓 寫緩衝區 追加 沒寫完的資料,並設定 m_channel註冊寫網路事件,用於下次繼續寫 m_outputbuffer.append(static_cast<const char*>(data)+nwrote,remaining); if(!m_channel->isWriting()) m_channel->enableWriting(); } } //關閉tcpconenction,這個函式由eventloop呼叫 void tcpconnection::shutdownInLoop() { //保證eventloop在其所在的執行緒而非當前執行緒呼叫該函式 m_loop->assertInLoopThread(); //關閉套接字寫操作 if(!m_channel->isWriting()) m_socket->shutdownWrite(); } //供eventloop回撥使用,用來強制關閉tcp連線 void tcpconnection::forceCloseInLoop() { m_loop->assertInLoopThread(); if(m_state==kConnected || m_state == kDisconnected) handleClose(); } //把tcp連線狀態轉換成字串形式 const char* tcpconnection::stateToString() const { switch (m_state) { case kDisconnected: return "kDisconnected"; case kConnecting: return "kConnecting"; case kConnected: return "kConnected"; case kDisconnecting: return "kDisconnecting"; default: return "unknown state"; } } //讓channel註冊讀網路事件 void tcpconnection::startReadInLoop() { m_loop->assertInLoopThread(); if(!m_reading || !m_channel->isReading()) { m_channel->enableReading(); //讓m_channel註冊讀網路事件 m_reading=true; } } //讓channel不再關心讀網路事件 void tcpconnection::stopReadInLoop() { m_loop->assertInLoopThread(); if(m_reading || m_channel->isReading()) { m_channel->disableReading(); //讓m_channel取關讀網路事件 m_reading=false; } } } }
測試:
tcpconnection是為tcpserver服務的,我們待會測試。