1. 程式人生 > >SRS原始碼分析-讀寫類SrsStSocket

SRS原始碼分析-讀寫類SrsStSocket

關鍵類SrsStSocket封裝了socket的讀寫操作,將負責將資料傳送給對端(send)以及讀取對端傳送過來的資料(read)。

繼承關係為:

SRS讀寫類繼承關係

相關原始碼如下:

//使用協程的TCP
class SrsStSocket : public ISrsProtocolReaderWriter
{
private:
    // The recv/send timeout in ms.
    // @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms.
    int64_t rtm; //recv timeout
    int64_t stm; //send timeout
    // The recv/send data in bytes
    int64_t rbytes; //recv data bytes
    int64_t sbytes; //send data bytes
    // The underlayer st fd.
    srs_netfd_t stfd; //st fd
public:
    SrsStSocket();
    virtual ~SrsStSocket();
public:
    // Initialize the socket with stfd, user must manage it.
    virtual srs_error_t initialize(srs_netfd_t fd); //初始化
public:
    virtual bool is_never_timeout(int64_t tm);
    virtual void set_recv_timeout(int64_t tm);
    virtual int64_t get_recv_timeout();
    virtual void set_send_timeout(int64_t tm);
    virtual int64_t get_send_timeout();
    virtual int64_t get_recv_bytes();
    virtual int64_t get_send_bytes();
public:
    /**
     * @param nread, the actual read bytes, ignore if NULL.
     */
    virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
    virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
    /**
     * @param nwrite, the actual write bytes, ignore if NULL.
     */
    virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
    virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};


//建構函式
SrsStSocket::SrsStSocket()
{
    stfd = NULL;
    stm = rtm = SRS_CONSTS_NO_TMMS;
    rbytes = sbytes = 0;
}

//解構函式
SrsStSocket::~SrsStSocket()
{
}

//初始化,srs的fd
srs_error_t SrsStSocket::initialize(srs_netfd_t fd)
{
    stfd = fd;
    return srs_success;
}

//是否永不超時
bool SrsStSocket::is_never_timeout(int64_t tm)
{
    return tm == SRS_CONSTS_NO_TMMS;
}

//設定接收訊息的超時時間
void SrsStSocket::set_recv_timeout(int64_t tm)
{
    rtm = tm;
}

//獲取接收訊息的超時時間
int64_t SrsStSocket::get_recv_timeout()
{
    return rtm;
}

//設定傳送訊息的超時時間
void SrsStSocket::set_send_timeout(int64_t tm)
{
    stm = tm;
}

//獲取傳送訊息的超時時間
int64_t SrsStSocket::get_send_timeout()
{
    return stm;
}

//獲取接收到的訊息的位元組數
int64_t SrsStSocket::get_recv_bytes()
{
    return rbytes;
}

//獲取傳送的訊息的位元組數
int64_t SrsStSocket::get_send_bytes()
{
    return sbytes;
}

//讀:從stfd中讀取size bytes資料到buf,讀取到的位元組數為nread
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_read; //讀取的位元組數
    //從stfd中讀取size個位元組的訊息到buf中
    if (rtm == SRS_CONSTS_NO_TMMS) {
        nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
    }
    
    if (nread) {
        *nread = nb_read;
    }
    
    // On success a non-negative integer indicating the number of bytes actually read is returned
    // (a value of 0 means the network connection is closed or end of file is reached).
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    //nb_read < 0:超時
    //nb_read = 0:對端已經關閉
    //nb_read > 0:得到讀取的位元組數nb_read
    if (nb_read <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_read < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
        }
        
        if (nb_read == 0) {
            errno = ECONNRESET;
        }
        
        return srs_error_new(ERROR_SOCKET_READ, "read");
    }
    
    rbytes += nb_read; //讀取到的位元組數增加nb_read
    
    return err;
}

//讀取size,必須讀完
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_read;
    if (rtm == SRS_CONSTS_NO_TMMS) {
        nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
    }
    
    if (nread) {
        *nread = nb_read;
    }
    
    // On success a non-negative integer indicating the number of bytes actually read is returned
    // (a value less than nbyte means the network connection is closed or end of file is reached)
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_read != (ssize_t)size) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_read < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm);
        }
        
        if (nb_read >= 0) {
            errno = ECONNRESET;
        }
        
        return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");
    }
    
    rbytes += nb_read;
    
    return err;
}

//向fd寫入資料,buf為存放資料的陣列,size為寫入的位元組數,nwrite為實際寫入的位元組數
//在實際st庫中,呼叫write/writev寫入資料,提高寫入的效率
srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_write;
    if (stm == SRS_CONSTS_NO_TMMS) {
        nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
    }
    
    if (nwrite) {
        *nwrite = nb_write;
    }
    
    // On success a non-negative integer equal to nbyte is returned.
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_write <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_write < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", stm);
        }
        
        return srs_error_new(ERROR_SOCKET_WRITE, "write");
    }
    
    sbytes += nb_write;
    
    return err;
}

//writev,一次寫入多個buffer
srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
    srs_error_t err = srs_success;
    
    ssize_t nb_write;
    if (stm == SRS_CONSTS_NO_TMMS) {
        nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
    } else {
        nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
    }
    
    if (nwrite) {
        *nwrite = nb_write;
    }
    
    // On success a non-negative integer equal to nbyte is returned.
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_write <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_write < 0 && errno == ETIME) {
            return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", stm);
        }
        
        return srs_error_new(ERROR_SOCKET_WRITE, "writev");
    }
    
    sbytes += nb_write;
    
    return err;
}