SRS原始碼分析-讀寫類SrsStSocket
阿新 • • 發佈:2019-02-10
關鍵類SrsStSocket封裝了socket的讀寫操作,將負責將資料傳送給對端(send)以及讀取對端傳送過來的資料(read)。
繼承關係為:
相關原始碼如下:
//使用協程的TCP class SrsStSocket : public ISrsProtocolReaderWriter { private: // The recv/send timeout in ms. // @remark Use SRS_CONSTS_NO_TMMS for never timeout in ms. int64_t rtm; //recv timeout int64_t stm; //send timeout // The recv/send data in bytes int64_t rbytes; //recv data bytes int64_t sbytes; //send data bytes // The underlayer st fd. srs_netfd_t stfd; //st fd public: SrsStSocket(); virtual ~SrsStSocket(); public: // Initialize the socket with stfd, user must manage it. virtual srs_error_t initialize(srs_netfd_t fd); //初始化 public: virtual bool is_never_timeout(int64_t tm); virtual void set_recv_timeout(int64_t tm); virtual int64_t get_recv_timeout(); virtual void set_send_timeout(int64_t tm); virtual int64_t get_send_timeout(); virtual int64_t get_recv_bytes(); virtual int64_t get_send_bytes(); public: /** * @param nread, the actual read bytes, ignore if NULL. */ virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread); /** * @param nwrite, the actual write bytes, ignore if NULL. */ virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite); }; //建構函式 SrsStSocket::SrsStSocket() { stfd = NULL; stm = rtm = SRS_CONSTS_NO_TMMS; rbytes = sbytes = 0; } //解構函式 SrsStSocket::~SrsStSocket() { } //初始化,srs的fd srs_error_t SrsStSocket::initialize(srs_netfd_t fd) { stfd = fd; return srs_success; } //是否永不超時 bool SrsStSocket::is_never_timeout(int64_t tm) { return tm == SRS_CONSTS_NO_TMMS; } //設定接收訊息的超時時間 void SrsStSocket::set_recv_timeout(int64_t tm) { rtm = tm; } //獲取接收訊息的超時時間 int64_t SrsStSocket::get_recv_timeout() { return rtm; } //設定傳送訊息的超時時間 void SrsStSocket::set_send_timeout(int64_t tm) { stm = tm; } //獲取傳送訊息的超時時間 int64_t SrsStSocket::get_send_timeout() { return stm; } //獲取接收到的訊息的位元組數 int64_t SrsStSocket::get_recv_bytes() { return rbytes; } //獲取傳送的訊息的位元組數 int64_t SrsStSocket::get_send_bytes() { return sbytes; } //讀:從stfd中讀取size bytes資料到buf,讀取到的位元組數為nread srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; ssize_t nb_read; //讀取的位元組數 //從stfd中讀取size個位元組的訊息到buf中 if (rtm == SRS_CONSTS_NO_TMMS) { nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000); } if (nread) { *nread = nb_read; } // On success a non-negative integer indicating the number of bytes actually read is returned // (a value of 0 means the network connection is closed or end of file is reached). // Otherwise, a value of -1 is returned and errno is set to indicate the error. //nb_read < 0:超時 //nb_read = 0:對端已經關閉 //nb_read > 0:得到讀取的位元組數nb_read if (nb_read <= 0) { // @see https://github.com/ossrs/srs/issues/200 if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm); } if (nb_read == 0) { errno = ECONNRESET; } return srs_error_new(ERROR_SOCKET_READ, "read"); } rbytes += nb_read; //讀取到的位元組數增加nb_read return err; } //讀取size,必須讀完 srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; ssize_t nb_read; if (rtm == SRS_CONSTS_NO_TMMS) { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000); } if (nread) { *nread = nb_read; } // On success a non-negative integer indicating the number of bytes actually read is returned // (a value less than nbyte means the network connection is closed or end of file is reached) // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_read != (ssize_t)size) { // @see https://github.com/ossrs/srs/issues/200 if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", (int)rtm); } if (nb_read >= 0) { errno = ECONNRESET; } return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully"); } rbytes += nb_read; return err; } //向fd寫入資料,buf為存放資料的陣列,size為寫入的位元組數,nwrite為實際寫入的位元組數 //在實際st庫中,呼叫write/writev寫入資料,提高寫入的效率 srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) { srs_error_t err = srs_success; ssize_t nb_write; if (stm == SRS_CONSTS_NO_TMMS) { nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000); } if (nwrite) { *nwrite = nb_write; } // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { // @see https://github.com/ossrs/srs/issues/200 if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", stm); } return srs_error_new(ERROR_SOCKET_WRITE, "write"); } sbytes += nb_write; return err; } //writev,一次寫入多個buffer srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { srs_error_t err = srs_success; ssize_t nb_write; if (stm == SRS_CONSTS_NO_TMMS) { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000); } if (nwrite) { *nwrite = nb_write; } // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { // @see https://github.com/ossrs/srs/issues/200 if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", stm); } return srs_error_new(ERROR_SOCKET_WRITE, "writev"); } sbytes += nb_write; return err; }