1. 程式人生 > 實用技巧 >muduo原始碼解析29-網路庫7:tcpconnection類

muduo原始碼解析29-網路庫7:tcpconnection類

tcpconnection類:

說明:

之前我們提到了acceptor類負責socket(),bind(),listen()和accept()一個連線。

但是accept()之後的操作,例如和這個連線套接字的讀/寫,關閉連線等操作都沒有實現。

這些操作都將由tcpconnection這個類來實現。

tcpconnection是整個網路庫的核心,封裝一次TCP連線,注意它不能發起連線
tcpserver和tcpclient都用到了tcpconnection

tcpconnection是為tcpserver服務的

tcpserver(待會講到)中會有一個acceptor負責接收新的連線,acceptor每接受一個連線時就會為這個連線建立一個tcpconnection。

acceptor在建立tcpconnection時為其為指定四個回撥函式,讀訊息,寫完成,連線建立,關閉連線。

被呼叫處:handleRead();handleWrite()/sendInLoop();connectionEstablished();handleClose();

tcpconnection內部也會有一個socket和一個對應的channel,需要在channel上註冊讀/寫/關閉/錯誤網路事件,一旦發生這寫網路事件,tcpconnection就會去執行相應的回撥函式:

讀網路事件:channel回撥tcpconnection::handleRead()把資料讀到m_inputbuffer中

寫網路事件:channel回撥tcpconnection::handleWrite()把資料通過m_outputbuffer進行處理(見程式碼)

關閉網路事件:tcpconnection::handleClose()

錯誤網路事件:tcpconnection::handleError()

一個tcpconenction有四種狀態:已連線,未連線,正在連線,正在斷開

內部有一個socket和channel,
tcpconenction構造時會設定channel的四個回撥函式,read/write/close/error
只要當socket上發生網路事件時.poller::poll()就會返回,channel會被新增到

eventloop::activeChannels,然後呼叫channel::handleEvent()根據不同的網路事件
呼叫上面四個回撥函式來處理

內部有兩塊之前的buffer,主要是為了解決高併發情況下讀寫緩衝區的問題,
inputbuffer:解決包不完整,粘包問題
先把資料全部寫到inputbuffer中,之後再進行處理,只當有一個完整的包時,才進行業務處理
outputbuffer:解決當前核心寫阻塞的問題
先把資料全部寫到outputbuffer中,每當核心可寫時,才把outputbuffer中資料寫出去

tcpconnection基本上都是通過evenntloop::runInLoop()來實現對於tcp連線的管理的,
例如send,shutdown,forceclose等操作,下面以分別說一說具體的步驟:

先看一看send()函式
tcpconnection通過呼叫send傳送資料,實際上通過eventloop::runInLoop()把傳送任務
新增到eventloop中的任務佇列中,eventloop會回撥tcpconnection::sendInLoop()完成
傳送資料的操作.sendInLoop()先嚐試write(),如果無法一次全部寫完,需要把資料快取到
outputbuffer中,讓channel註冊寫網路事件,等到核心可寫時,socket就會出發可寫
網路事件,channel就回調handleWrite(),把outputbuffer中的資料傳送出去
只要資料全部寫完就會觸發m_writeCompleteCallback寫完成回撥.

再看一看forceclose()函式:
tcpconenction的關閉有兩種情況,一種是遠方斷開連線,此時socket觸發close網路事件
channel::handleEvent()會回撥&tcpconnection::handleClose()處理
第二種情況是我自己主動呼叫forceclose()關閉連線,此時會把
tcpconnection::forceCloseInLoop新增到eventloop中的任務佇列中,
讓eventloop再去主動呼叫channel::handleClose()關閉連線
可以看出來,兩種方法最終都是讓channel執行handleClose()關閉tcp連線

再來看一看startread()函式:
startread()實質實在channel上註冊一個可讀網路事件,只要socket發生可讀網路事件
channel就會呼叫handleEvent(),然後回撥tcpconnection::handleRead()來把資料讀
到inputbuffer中,不直接使用read得到的資料而先讀到inputbuffer的好處之前說過了,防止
粘包/包不完整,在inputbuffer中的資料還會被進一步處理才能使用

tcpconnection.h

#ifndef TCPCONNECTION_H
#define TCPCONNECTION_H

#include"base/noncopyable.h"
#include"base/types.h"
#include"net/callbacks.h"
#include"net/buffer.h"
#include"net/inetaddress.h"

#include<memory>
#include<boost/any.hpp>

struct tcp_info;

namespace mymuduo{

namespace net {

class channel;
class eventloop;
class Socket;

class tcpconnection:noncopyable,public std::enable_shared_from_this<tcpconnection>
{
public:
    //建構函式,初始化成員,設定四個回撥函式,Read,Write,Close,ErrorCallback
    tcpconnection(eventloop* loop,const string& name,int sockfd,
                  const inetaddress& localAddr,const inetaddress& peerAddr);
    //解構函式,列印一下日誌啥也不幹
    ~tcpconnection();

    //獲取成員資訊:eventloop,name,server/client地址,連線是否建立
    eventloop* getLoop() const{return m_loop;}
    const string& name() const{return m_name;}
    const inetaddress& localAddress() const {return m_localAddr;}
    const inetaddress& peerAddress() const {return m_peerAddr;}
    bool connected() const{return m_state==kConnected;}
    bool disconnected() const{return m_state==kDisconnected;}

    //內部利用Socket::getTcpInfo()獲取tcp資訊
    bool getTcpInfo(struct tcp_info*) const;
    //內部利用Socket::getTcpInfoString()格式化tcp資訊
    string getTcpInfoString() const;

    void send(const string& message);
    void send(const void* message,int len);
    void send(buffer* message);
    //關閉tcp連線,並讓eventloop回撥&TcpConnection::shutdownInLoop()
    void shutdown();
    //讓eventloop強制關閉tcp連線,利用eventloop的定時器佇列實現(喚醒poller::poll()來關閉),eventloop
    //回撥tcpconnection::forceCloseInLoop()
    void forceClose();
    //強制在seconds秒後關閉tcp連線
    void forceCloseWithDelay(double seconds);
    //sockets::setTcpNoDelay()
    void setTcpNoDelay(bool on);

    //eventloop回撥tcpconenction::startReadInLoop()
    void startRead();
    //eventloop回撥tcpconnection::stopReadInLoop()
    void stopRead();
    bool isReading() const{return m_reading;}
    void setContext(const boost::any& context){m_context=context;}
    const boost::any& getContext()const{return m_context;}
    boost::any* getMutableContext(){return &m_context;}

    //設定四個回撥函式
    void setConnectionCallback(const ConnectionCallback& cb){ m_connectionCallback = cb; }

    void setMessageCallback(const MessageCallback& cb){ m_messageCallback = cb; }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    { m_writeCompleteCallback = cb; }

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
    { m_highWaterMarkCallback = cb; m_highWaterMark = highWaterMark; }

    //獲取讀/寫緩衝區
    buffer* inputBuffer(){return &m_inputbuffer;}
    buffer* outputBuffer(){return &m_outputbuffer;}

    //僅作為內部使用
    void setCloseCallback(const CloseCallback& cb){m_closeCallback=cb;}

    void connectEstablished();
    void connectDestroyed();


private:
    //tcpconnection 的四種狀態,連線未建立,正在建立,已建立,正在斷開
    enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};

    //處理讀/寫網路事件,供channel回撥
    void handleRead(timestamp receiveTime);
    void handleWrite();
    //處理關閉連線的網路事件,讓channel不再關心任何網路事件.
    void handleClose();
    //處理錯誤事件,利用sockets::getSocketError()來實現
    void handleError();

    void sendInLoop(const string& message);
    void sendInLoop(const void* message,size_t len);
    //關閉tcpconenction
    void shutdownInLoop();

    //供eventloop回撥使用,用來強制關閉tcp連線
    void forceCloseInLoop();

    void setState(StateE s){m_state=s;}
    //把tcp連線狀態轉換成字串形式
    const char* stateToString() const;

    //讓channel註冊讀網路事件
    void startReadInLoop();
    //讓channel不再關心讀網路事件
    void stopReadInLoop();


    eventloop* m_loop;                              //所在的eventloop
    const string m_name;                            //name
    StateE m_state;                                 //連線狀態,有上面四種
    bool m_reading;                                 //是否正在讀
    std::unique_ptr<Socket> m_socket;               //server Socket*
    std::unique_ptr<channel> m_channel;             //socket對應的channel
    const inetaddress m_localAddr;                  //server地址
    const inetaddress m_peerAddr;                   //client地址
    ConnectionCallback m_connectionCallback;        //連線成功的回撥函式
    MessageCallback m_messageCallback;              //讀事件成功的回撥函式
    WriteCompleteCallback m_writeCompleteCallback;  //寫事件完成的回撥函式
    HighWaterMarkCallback m_highWaterMarkCallback;          //...?
    CloseCallback m_closeCallback;                  //連線關閉時的回撥函式
    size_t m_highWaterMark;                         //...?
    buffer m_inputbuffer;                           //TCP讀緩衝區
    buffer m_outputbuffer;                          //TCP寫緩衝區
    boost::any m_context;                           //...?


};

typedef std::shared_ptr<tcpconnection> TcpConnectionPtr;


}//namespace net

}//namespace mymuduo


#endif // TCPCONNECTION_H

tcpconnection.cpp

#include "tcpconnection.h"

#include"base/logging.h"

#include"net/channel.h"
#include"base/weakcallback.h"
#include"net/eventloop.h"
#include"net/socket.h"
#include"net/socketsops.h"

#include<errno.h>

namespace mymuduo {

namespace net {

//"net/callbacks.h"中defaultConnectionCallback和defaultMessageCallback的定義

void defaultConnectionCallback(const TcpConnectionPtr& conn)
{
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
  // do not call conn->forceClose(), because some users want to register message callback only.
}

void defaultMessageCallback(const TcpConnectionPtr&,
                                        buffer* buf,
                                        timestamp)
{
  buf->retrieveAll();
}


//建構函式,初始化成員,設定四個回撥函式,Read,Write,Close,ErrorCallback
tcpconnection::tcpconnection(eventloop* loop,const string& name,int sockfd,
                             const inetaddress& localAddr,const inetaddress& peerAddr)
    :m_loop(loop),m_name(name),m_state(kConnecting),m_reading(true),
      m_socket(new Socket(sockfd)),
      m_channel(new channel(loop,sockfd)),
      m_localAddr(localAddr),
      m_peerAddr(peerAddr)
{
    //設定channel::handleEvent()時的四個回撥函式,讀/寫/關閉/錯誤
    m_channel->setReadCallback(std::bind(&tcpconnection::handleRead,this,_1));
    m_channel->setWriteCallback(std::bind(&tcpconnection::handleWrite,this));
    m_channel->setCloseCallback(std::bind(&tcpconnection::handleClose,this));
    m_channel->setErrorCallback(std::bind(&tcpconnection::handleError,this));
    LOG_DEBUG << "TcpConnection::ctor[" <<  m_name << "] at " << this
              << " fd=" << sockfd;
    m_socket->setKeepAlive(true);//設定是否開啟keepalive判斷通訊方是否存活
}
//解構函式,列印一下日誌啥也不幹
tcpconnection::~tcpconnection()
{
    LOG_DEBUG << "TcpConnection::dtor[" <<  m_name << "] at " << this
              << " fd=" << m_channel->fd()
              << " state=" << stateToString();
    assert(m_state == kDisconnected);
}

//內部利用Socket::getTcpInfo()獲取tcp資訊
bool tcpconnection::getTcpInfo(struct tcp_info* tcpi) const
{
    return m_socket->getTcpInfo(tcpi);
}

//內部利用Socket::getTcpInfoString()格式化tcp資訊
string tcpconnection::getTcpInfoString() const
{
    char buf[102]={0};
    buf[0]='\0';
    m_socket->getTcpInfoString(buf,sizeof buf);
    return buf;
}

//傳送資料,內部讓eventloop回撥tcpconnection::sendInLoop()來實現傳送資料
void tcpconnection::send(const string& message)
{
    if(m_state==kConnected)
    {
        //讓eventloop回撥tcpconnection::sendInLoop完成傳送資料操作
        if(m_loop->isInLoopThread())
            sendInLoop(message);
        else
        {
            //函式指標fp指向sendInLoop;
            void (tcpconnection::*fp)(const string& message) = &tcpconnection::sendInLoop;
            m_loop->runInLoop(std::bind(fp,this,message.data()));
        }
    }
}
void tcpconnection::send(const void* message,int len)
{
    send(string(static_cast<const char*>(message),len));
}
void tcpconnection::send(buffer* buf)
{
    if(m_state==kConnected)
    {
        if(m_loop->isInLoopThread())
        {
            //
            sendInLoop(buf->peek(),buf->readableBytes());
            buf->retrieveAll();
        }
        else
        {
            void (tcpconnection::*fp)(const string& message)=&tcpconnection::sendInLoop;
            m_loop->runInLoop(
                        std::bind(fp,this,buf->retrieveAllAsString()));
        }
    }
}

//關閉tcp連線,並讓eventloop回撥&TcpConnection::shutdownInLoop()
void tcpconnection::shutdown()
{
    //只有已經建立了連線才可以關閉連線
    if(m_state==kConnected)
    {
        setState(kDisconnected);
        //讓eventloop回撥tcpconnection::shutdownInLoop
        m_loop->runInLoop(std::bind(&tcpconnection::shutdownInLoop,this));
    }
}
//讓eventloop強制關閉tcp連線,利用eventloop的定時器佇列實現(喚醒poller::poll()來關閉),eventloop
//回撥tcpconnection::forceCloseInLoop()
void tcpconnection::forceClose()
{
    if(m_state==kConnected || m_state==kDisconnecting)
    {
        setState(kDisconnecting);
        //讓eventloop回撥tcpconnection::forceCloseInLoop
        m_loop->queueInLoop(std::bind(&tcpconnection::forceCloseInLoop,shared_from_this()));
    }
}
//強制在seconds秒後關閉tcp連線
void tcpconnection::forceCloseWithDelay(double seconds)
{
    if(m_state==kConnected || m_state==kDisconnecting)
    {
        setState(kDisconnecting);
        m_loop->runAfter(seconds,
                         makeWeakCallback(shared_from_this(),&tcpconnection::forceClose));
    }
}
//sockets::setTcpNoDelay()
void tcpconnection::setTcpNoDelay(bool on)
{
    m_socket->setTcpNoDelay(on);
}

//eventloop回撥tcpconenction::startReadInLoop()
void tcpconnection::startRead()
{
    m_loop->runInLoop(std::bind(&tcpconnection::startReadInLoop,this));
}
//eventloop回撥tcpconnection::stopReadInLoop()
void tcpconnection::stopRead()
{
    m_loop->runInLoop(std::bind(&tcpconnection::stopReadInLoop,this));
}

//僅作為內部使用

void tcpconnection::connectEstablished()
{
    m_loop->assertInLoopThread();
    assert(m_state==kConnecting);
    setState(kConnected);
    m_channel->tie(shared_from_this());
    m_channel->enableReading();

    m_connectionCallback(shared_from_this());
}
void tcpconnection::connectDestroyed()
{
    m_loop->assertInLoopThread();
    if(m_state==kConnected)
    {
        setState(kDisconnected);
        m_channel->disableAll();
        m_connectionCallback(shared_from_this());
    }
    m_channel->remove();
}

//處理讀網路事件,供channel回撥,把資料讀到 m_inputbuffer中
void tcpconnection::handleRead(timestamp receiveTime)
{
    m_loop->assertInLoopThread();
    int saveErrno=0;
    //呼叫sockets::readv()把資料讀到m_inputbuffer中
    ssize_t n=m_inputbuffer.readFd(m_channel->fd(),&saveErrno);
    //讀到了資料呼叫m_messageCallback()回撥函式
    if(n>0)
        m_messageCallback(shared_from_this(),&m_inputbuffer,receiveTime);
    else if(n==0)
        handleClose();
    else
    {
        errno=saveErrno;
        LOG_SYSERR<<"TcpConnection::handleRead";
        handleError();
    }

}
//處理寫網路事件,把m_outputbuffer中的資料寫到m_channel->fd()上,即傳送資料
void tcpconnection::handleWrite()
{
    m_loop->assertInLoopThread();
    if(m_channel->isWriting())
    {
        ssize_t n=sockets::write(m_channel->fd(),m_outputbuffer.peek(),
                                 m_outputbuffer.readableBytes());
        if(n>0) //成功寫入的位元組數
        {
            m_outputbuffer.retrieve(n);//移動寫緩衝區的m_writerIndex
            //寫緩衝區中所有資料都發完了,呼叫寫事件完成回撥函式
            if(m_outputbuffer.readableBytes()==0)
                m_loop->queueInLoop(std::bind(m_writeCompleteCallback,shared_from_this()));
            if(m_state==kDisconnecting)
                shutdownInLoop();
        }
        else
        {
            LOG_SYSERR<<"TcpConnection::handleWrite";
        }
    }else
    {
        LOG_TRACE<<"Connection fd = "<<m_channel->fd()<<" is down, no more writing";
    }
}

//處理關閉連線的網路事件,讓channel不再關心任何網路事件.
//主動呼叫tcpconnection::forceClose()或者遠方斷開連線都會觸發這個回撥
void tcpconnection::handleClose()
{
    m_loop->assertInLoopThread();
    LOG_TRACE << "fd = " << m_channel->fd() << " state = " << stateToString();
    assert(m_state==kConnected || m_state==kDisconnecting);
    setState(kDisconnected);
    m_channel->disableAll();    //取關所有網路事件
    TcpConnectionPtr guardThis(shared_from_this());
    m_connectionCallback(guardThis);
    m_closeCallback(guardThis);
}
//處理錯誤事件,利用sockets::getSocketError()來實現
void tcpconnection::handleError()
{
    int err = sockets::getSocketError(m_channel->fd());
    LOG_ERROR << "TcpConnection::handleError [" << m_name
              << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

void tcpconnection::sendInLoop(const string& message)
{
    sendInLoop(message.data(),message.size());
}

//在eventloop中回撥這個sendInLoop(),用於傳送message資料
//內部通過sockets::write()實現傳送資料,先發送一次,如果沒全部發送完,把資料先
//放到outputbuffer中,讓channel註冊寫網路事件,等到核心可寫時,socket就會出發可寫
//網路事件,channel就回調handleWrite(),把outputbuffer中的資料傳送出去
void tcpconnection::sendInLoop(const void* data,size_t len)
{
    m_loop->assertInLoopThread();
    ssize_t nwrote=0;       //已經發了多少位元組
    size_t remaining =len;  //還剩下多少位元組沒傳送
    bool faultError=false;  //錯誤
    if(m_state==kDisconnected)  //如果斷開了連線
    {
        LOG_WARN<<"disconnected,give up writing";
        return;
    }
    //if no thing in output queue,try writing directly
    if(!m_channel->isWriting() && m_outputbuffer.readableBytes()==0)
    {
        nwrote=sockets::write(m_channel->fd(),data,len);
        if(nwrote>=0)
        {
            //成功發了nwrote位元組
            remaining=len-nwrote;
            //如果全部發完了,讓eventloop回撥寫成功m_writeCompleteCallback函式
            if(remaining==0 && m_writeCompleteCallback)
                m_loop->queueInLoop(std::bind(m_writeCompleteCallback,shared_from_this()));
        }
        else
        {
            //寫失敗
            nwrote=0;
            if(errno!=EWOULDBLOCK)
            {
                LOG_SYSERR<<"TcpConnection::sendInLoop";
                if(errno==EPIPE || errno ==ECONNRESET)
                    faultError=true;
            }

        }
    }
    assert(remaining<=len);
    if(!faultError && remaining>0)
    {
        //還剩下多少位元組沒寫出去
        size_t oldLen=m_outputbuffer.readableBytes();
        //發生特定條件,讓eventloop回撥m_highWaterMarkCallback
        if(oldLen+remaining>=m_highWaterMark && oldLen<m_highWaterMark && m_highWaterMarkCallback)
        {
            m_loop->runInLoop(std::bind(m_highWaterMarkCallback,shared_from_this(),
                                        oldLen+remaining));
        }
        //讓 寫緩衝區 追加 沒寫完的資料,並設定 m_channel註冊寫網路事件,用於下次繼續寫
        m_outputbuffer.append(static_cast<const char*>(data)+nwrote,remaining);
        if(!m_channel->isWriting())
            m_channel->enableWriting();
    }

}
//關閉tcpconenction,這個函式由eventloop呼叫
void tcpconnection::shutdownInLoop()
{
    //保證eventloop在其所在的執行緒而非當前執行緒呼叫該函式
    m_loop->assertInLoopThread();
    //關閉套接字寫操作
    if(!m_channel->isWriting())
        m_socket->shutdownWrite();
}

//供eventloop回撥使用,用來強制關閉tcp連線
void tcpconnection::forceCloseInLoop()
{
    m_loop->assertInLoopThread();
    if(m_state==kConnected || m_state == kDisconnected)
        handleClose();
}

//把tcp連線狀態轉換成字串形式
const char* tcpconnection::stateToString() const
{
    switch (m_state)
    {
      case kDisconnected:
        return "kDisconnected";
      case kConnecting:
        return "kConnecting";
      case kConnected:
        return "kConnected";
      case kDisconnecting:
        return "kDisconnecting";
      default:
        return "unknown state";
    }
}

//讓channel註冊讀網路事件
void tcpconnection::startReadInLoop()
{
    m_loop->assertInLoopThread();
    if(!m_reading || !m_channel->isReading())
    {
        m_channel->enableReading(); //讓m_channel註冊讀網路事件
        m_reading=true;
    }
}
//讓channel不再關心讀網路事件
void tcpconnection::stopReadInLoop()
{
    m_loop->assertInLoopThread();
    if(m_reading || m_channel->isReading())
    {
        m_channel->disableReading();    //讓m_channel取關讀網路事件
        m_reading=false;
    }
}

}

}

測試:

tcpconnection是為tcpserver服務的,我們待會測試。