1. 程式人生 > >muduo_net程式碼剖析之Buffer類的設計

muduo_net程式碼剖析之Buffer類的設計

一、備用知識

1、為什麼TcpConnection必須要有output buffer

在這裡插入圖片描述
考慮一個常見場景:程式想通過TCP連線相對方傳送100K位元組的資料,但是write()呼叫中,作業系統只接收了80K位元組(受TCP advertised window的控制,細節見TCPv1),你肯定不想在原地等待,因為不知道會等多久(取決於對方什麼時候能夠接收資料,然後華東TCP視窗)。程式應該儘快交出控制權,返回eventloop。在這種情況下,剩餘的20K位元組資料怎麼處理?

對於應用程式而言,它只管生成資料,它不應該關心到底資料是一次性發送還是分成幾次傳送,這些應該由網路庫來操心,程式只要呼叫TcpConnection::send()就行了,網路庫會負責到底。①網路庫應該接管這剩餘的20K位元組資料,把它儲存在該Tcp connection的output buffer中

,然後註冊POLLOUT事件,一旦socket變得可寫,就立即將20K資料的一部分寫入socket。②當然,這第二次write()也不一定能將20K位元組全部寫入到socket,如果還有剩餘,網路庫應該繼續關注POLLOUT事件;③如果寫完了20K位元組,網路庫應該停止關注POLLOUT事件,以免造成busy loop。(Muduo EventLoop採用的就是epoll level trigger,這麼做的具體原因,我以後再說。)

如果程式又寫入了50K位元組,而這時候output buffer裡還有待發送的20K資料,那麼網路庫不應該直接呼叫write(),而應該把這50K資料append在那20K資料之後,等socket變得可寫的時候,再一併寫入。

如果output buffer裡還有待發送的資料,而程式又想關閉連線(對於程式而言,呼叫TcpConnection::send()之後,就認為資料遲早會發送出去),那麼這時候網路庫不能立即關閉連線,而要等待資料傳送完畢,(參考“為什麼muduo的shutdown()沒有直接關閉TCP連線”)。

綜上,要讓程式在write操作上不阻塞,網路庫必須要給每個tcp connection配置output buffer。

2、為什麼TcpConnection必須要有input buffer

在這裡插入圖片描述
TCP是一個無邊界的位元組流協議,接收方必須要處理“收到的資料尚不構成一條完整的訊息”和“一次收到兩條訊息的資料”等等粘包情況。

網路庫在處理“socket可讀”事件的時候,必須一次性把socket裡的資料讀完(即將資料從作業系統buffer搬到應用層buffer),否則會反覆觸發POLLIN事件,造成busy-loop。(再說一遍:Muduo EventLoop採用的就是epoll level trigger,這麼做的具體原因,我以後再說。)

那麼網路庫必然要應對“資料不完整”的情況,收到的資料先放到input buffer裡,等構成一條完整的訊息再通知程式的業務邏輯。這通常是code的職責,間第2.3節“Boost.Asio聊天伺服器”一文中“TCP分包”的論述和程式碼。

所以,在tcp網路程式設計中,網路庫必須要給每一個tcp connection配置input buffer。

當input buffer中有資料時,就會通知上層應用程式,onMessage(Buffer*)回撥,根據上層應用層協議判定是否是一個完整的包,即:如果不是一條完整的訊息,就不會取走資料,也不會進行相應的處理;如果是一條完整的訊息,將取走訊息並進行相應的處理。

epoll使用level trigger(LT模式)的原因

① 與poll相容
② LT模式不會發生漏掉事件的BUG,但POLLOUT事件不能一開始就關注,否則會出現busy loop,而應該在write無法完全寫入核心緩衝區的時候才關注,將未寫入核心緩衝區的資料新增到應用層output buffer,直到應用層output buffer寫完,寫完後停止關注POLLOUT事件
③ 讀寫的時候不必等候EAGIN,可以節約系統呼叫的次數,降低延遲。(注:如果用ET模式,讀的時候讀到EAGAIN,寫的時候直到output buffer寫完或者EAGAIN)


二、應用層緩衝區Buffer

1、Buffer的要求

muduo buffer的設計考慮了常見的網路程式設計需求,我試圖在易用性和效能之間找一個平衡點,目前這個平衡點更偏向於易用性。

muduo buffer的設計要點:
(1) 對外表現為一塊連續的記憶體(char* ,len),以方便客戶程式碼的編寫
(2) 其size()可以自動增長,以適應不同大小的訊息。他不是一個fixed size array(即char buf[8192])
(3) 內部以vector of char 來儲存資料,並提供相應的訪問函式

Buffer其實像一個queue,從末尾寫入資料,從頭部讀出資料。即內部維護了下標,分別是:讀入位置、寫入位置。

TcpConnection會有兩個Buffer成員, Buffer inputBuffer_ 、Buffer outputBuffer_;

  • input buffer:TcpConnection會從socket讀取資料,然後寫入input buffer(其實這一步是用Buffer::readFd()完成的);客戶程式碼從input buffer讀取資料
  • output buffer:客戶端程式碼會把資料寫入output buffer(其實這一步是用TcpConnection::send()完成的);TcpConnection從output buffer讀取資料並寫入到socket。

其實,input、output是針對於客戶端程式碼而言,客戶程式碼從input讀,往output寫。TcpConnection的讀寫正好相反。

2、Buffer類的成員

Buffer的作用就是暫時儲存資料。當向Buffer寫入資料後,Buffer可寫入空間writeable減小,可讀空間readable增大;取走資料後變化相反。
muduo中的Buffer為封裝了的vector。vector為一塊連續空間,且其本身具有自動增長的性質,它的迭代器為原始指標,使用起來較為方便。

①Buffer類只有3個成員變數,分別是buffer_、readerIndex_、writerIndex_

std::vector<char> buffer_; //vector用來替代固定大小的數字
size_t readerIndex_; //讀位置
size_t writerIndex_; //寫位置

kCRLF是一個柔性陣列,存放“\r\n”,可以用來搜尋緩衝區資料中的”\r\n“

②其中,readerIndex_、writerIndex_將整個buffer_分成了3個區域,分別是prependable、writable、readable ,Buffer的結構如下圖:
在這裡插入圖片描述
readeIndex_是可讀部分的首位置,writerIndex_是可寫部分的首位置。兩個中間就是實際可讀(readable)的元素。前部預留的Prepend部分可以用來向頭部新增內容而不需要記憶體挪動的消耗。下面三個函式能獲得這3個區域的大小:

size_t readableBytes() const //可讀空間,裡面放有快取的資料
{ return writerIndex_ - readerIndex_; }
size_t writableBytes() const //可寫空間大小
{ return buffer_.size() - writerIndex_; }
size_t prependableBytes() const //預留空間大小
{ return readerIndex_; }

3、與3個區域相關的函式

① 建構函式

  1. kCheapPrepend、kInitialSize分別表示prependable、writable的大小
  2. 初始時,將readIndex、writeIndex設為kCheapPrepend(8)位置
    在這裡插入圖片描述
static const size_t kCheapPrepend = 8; //prependable的大小
static const size_t kInitialSize = 1024; //writable的大小

explicit Buffer(size_t initialSize = kInitialSize)
  : buffer_(kCheapPrepend + initialSize), // 8+1024
    readerIndex_(kCheapPrepend), //8
    writerIndex_(kCheapPrepend)  //8
{
  assert(readableBytes() == 0);
  assert(writableBytes() == initialSize);
  assert(prependableBytes() == kCheapPrepend);
}

② 獲取readIndex、writerIndex位置

//圖中的readIndex位置
const char* peek() const 
{ return begin() + readerIndex_; }

//圖中writerIndex位置
char* beginWrite()
{ return begin() + writerIndex_; }
const char* beginWrite() const
{ return begin() + writerIndex_; }

4、Buffer的操作:讀、寫

  1. 基本的read-write操作
    ①當有足夠的空間讀寫時,每次read走、write入資料後,都要將readIndex、writeIndex指標下標後移
    ②當將資料全部read空時,readIndex、writeIndex指標下標設定為初始位置,即kCheapPrepend(8)
    在這裡插入圖片描述

  2. 內部騰挪
    Buffer沒有封裝為一個環,所以當Buffer使用一段時間後,其真正的可寫入空間不一定是size() - writeIndex,因為readIndex和prependable之間可能還有空間。因此當可寫如空間不夠時,有時可以通過把CONTENT向“前移”來達到目的。

void shrink(size_t reserve)
{
  Buffer other; //生成臨時物件other(大小為readableBytes()+reserve),交換之後,臨時物件析構掉
  
//兩種情況
  //1.空間不夠resize空間
  //2.空間足夠內部騰挪
  other.ensureWritableBytes(readableBytes()+reserve); 

  //將當前空間中readable中的資料全部新增到other空間中,然後再交換swap
  other.append(toStringPiece()); 
  swap(other);
}

在這裡插入圖片描述
3. 擴增可寫空間writeable
當writeable不能夠容納足夠多的資料時,要進行動態的緩衝區擴充,即:①重新申請一塊更大的緩衝區 ②舊空間上的資料拷貝到新緩衝區上 ③釋放舊空間
在這裡插入圖片描述

4. 一個小小的創新:prepend空間
在這裡插入圖片描述

4、讀、寫Buffer

[1] 從readable區域中,讀走資料

//讀取len個位元組的資料
void retrieve(size_t len)
{
  assert(len <= readableBytes());
  if (len < readableBytes())//if 要讀位元組數 < 可讀位元組數
  {
    readerIndex_ += len; //只是readerIndex_後移len
  }
  else //if 要讀位元組數 > 可讀位元組數,即將資料全部讀走
  {
    retrieveAll(); //readerIndex_、writerIndex_位置重置為初始位置(8)
  }
}
void retrieveAll() 
{
  readerIndex_ = kCheapPrepend;
  writerIndex_ = kCheapPrepend;
}
//取出[readerIndex_,end]之間的資料,end為手動指定的
void retrieveUntil(const char* end);

//返回值:void
void retrieveInt64();
void retrieveInt32();
void retrieveInt16();
void retrieveInt8();
//返回值:intXX_t,即讀到的值
int64_t readInt64();
int32_t readInt32();
int16_t readInt16();
int8_t readInt8();

//取出readable中len位元組的資料,並轉換成string型別,再返回
string retrieveAsString(size_t len)//取出readable中所有的資料,並轉換成string型別,再返回
string retrieveAllAsString(); 

//取出readable中所有的資料,並轉換成StringPiece型別,再返回
StringPiece toStringPiece() const

[2] 將資料寫入到writable區域中
append():把資料追加到writeable區域中,如果空間不夠,會進行makespace

void append(const char* data, size_t len)
{
  ensureWritableBytes(len); //確保有可寫空間,空間不夠,就自動分配
  std::copy(data, data+len, beginWrite());
  hasWritten(len);
}
void ensureWritableBytes(size_t len) 
{
  if (writableBytes() < len) //如果可寫空間不夠
  {
    makeSpace(len); //resize或移動資料,使Buffer能容下len大資料
  }
  assert(writableBytes() >= len);
}
void makeSpace(size_t len) //resize或移動資料,使Buffer能容下len大資料
 {
   //確保空間是真的不夠,而不是挪動就可以騰出空間
   if (writableBytes() + prependableBytes() < len + kCheapPrepend)
   {
     // FIXME: move readable data
     buffer_.resize(writerIndex_+len);
   }
   else
   {
     //內部騰挪就足夠append,那麼就內部騰挪一下。
     // move readable data to the front, make space inside buffer
     assert(kCheapPrepend < readerIndex_);
     size_t readable = readableBytes();
     std::copy(begin()+readerIndex_,    //原來的可讀部分全部copy到Prepend位置,相當於向前挪動,為writeable留出空間
               begin()+writerIndex_,
               begin()+kCheapPrepend);
     readerIndex_ = kCheapPrepend;   //更新下標
     writerIndex_ = readerIndex_ + readable;
     assert(readable == readableBytes());
   }
 }
void append(const StringPiece& str)
{
  append(str.data(), str.size());
}
void append(const void* data, size_t len)
{
  append(static_cast<const char*>(data), len);
}

void appendInt64(int64_t x)
{
  int64_t be64 = sockets::hostToNetwork64(x);
  append(&be64, sizeof be64);
}
void appendInt32(int32_t x);
void appendInt16(int16_t x);
void appendInt8(int8_t x);

[3] 向從Prepend區域中,放入資料的函式API:

void prependInt64(int64_t x); //放入x,x大小為8位元組
void prependInt32(int32_t x);
void prependInt16(int16_t x);
void prependInt8(int8_t x);
void prepend(const void* data, size_t len);

5、核心函式 readFd

實現功能:從socket中讀取資料,存放到Buffer中的writeable中
實現細節:因為不知道一次性可以讀多少,因此先在棧上開闢了65536位元組的空間extrabuf,使用readv讀至Buffer_中。①如果Buffer中wriable足夠存放從fd讀到的資料,則讀取完畢;②否則剩餘資料先讀到extrabuf中然後以append的方式追加入Buffer_。

ssize_t Buffer::readFd(int fd, int* savedErrno)
{
  //saved an ioctl()/FIONREAD call to tell how much to read
  //節省一次ioctl系統呼叫(獲取當前有多少可讀資料)
  //為什麼這麼說?因為我們準備了足夠大的extrabuf,那麼就不需要使用ioctl取檢視fd有多少可讀位元組數了
  char extrabuf[65536];
 
  //使用iovec分配兩個連續的緩衝區
  struct iovec vec[2];
  const size_t writable = writableBytes();
//第一塊緩衝區,指向可寫空間begin()+writerIndex_
  vec[0].iov_base = begin()+writerIndex_;
  vec[0].iov_len = writable;
//第二塊緩衝區,指向棧上空間extrabuf
  vec[1].iov_base = extrabuf;
  vec[1].iov_len = sizeof extrabuf;
  
  // when there is enough space in this buffer, don't read into extrabuf.
  // when extrabuf is used, we read 128k-1 bytes at most.
  const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;  //writeable一般小於65536

  //返回值n:表示readv讀到的位元組數
  const ssize_t n = sockets::readv(fd, vec, iovcnt);  //iovcnt=2
  if (n < 0)
  {
    *savedErrno = errno;
  }
  else if (implicit_cast<size_t>(n) <= writable)//writable足夠容納
  {
    writerIndex_ += n;   //writerIndex_後移n
  }
  else //writable不足夠容納 ==> 資料被接受到了第二塊緩衝區extrabuf,將其append至buffer
  {
    writerIndex_ = buffer_.size();//更新writerIndex到buffer.size()位置
    append(extrabuf, n - writable);//將extrabuf中的資料,再追加到buffer中
  }
  return n;
}

程式碼中struct iovec vec[2]的圖示:
在這裡插入圖片描述