muduo_net程式碼剖析之Buffer類的設計
一、備用知識
1、為什麼TcpConnection必須要有output buffer
考慮一個常見場景:程式想通過TCP連線相對方傳送100K位元組的資料,但是write()呼叫中,作業系統只接收了80K位元組(受TCP advertised window的控制,細節見TCPv1),你肯定不想在原地等待,因為不知道會等多久(取決於對方什麼時候能夠接收資料,然後華東TCP視窗)。程式應該儘快交出控制權,返回eventloop。在這種情況下,剩餘的20K位元組資料怎麼處理?
對於應用程式而言,它只管生成資料,它不應該關心到底資料是一次性發送還是分成幾次傳送,這些應該由網路庫來操心,程式只要呼叫TcpConnection::send()就行了,網路庫會負責到底。①網路庫應該接管這剩餘的20K位元組資料,把它儲存在該Tcp connection的output buffer中
如果程式又寫入了50K位元組,而這時候output buffer裡還有待發送的20K資料,那麼網路庫不應該直接呼叫write(),而應該把這50K資料append在那20K資料之後,等socket變得可寫的時候,再一併寫入。
如果output buffer裡還有待發送的資料,而程式又想關閉連線(對於程式而言,呼叫TcpConnection::send()之後,就認為資料遲早會發送出去),那麼這時候網路庫不能立即關閉連線,而要等待資料傳送完畢,(參考“為什麼muduo的shutdown()沒有直接關閉TCP連線”)。
綜上,要讓程式在write操作上不阻塞,網路庫必須要給每個tcp connection配置output buffer。
2、為什麼TcpConnection必須要有input buffer
TCP是一個無邊界的位元組流協議,接收方必須要處理“收到的資料尚不構成一條完整的訊息”和“一次收到兩條訊息的資料”等等粘包情況。
網路庫在處理“socket可讀”事件的時候,必須一次性把socket裡的資料讀完(即將資料從作業系統buffer搬到應用層buffer),否則會反覆觸發POLLIN事件,造成busy-loop。(再說一遍:Muduo EventLoop採用的就是epoll level trigger,這麼做的具體原因,我以後再說。)
那麼網路庫必然要應對“資料不完整”的情況,收到的資料先放到input buffer裡,等構成一條完整的訊息再通知程式的業務邏輯。這通常是code的職責,間第2.3節“Boost.Asio聊天伺服器”一文中“TCP分包”的論述和程式碼。
所以,在tcp網路程式設計中,網路庫必須要給每一個tcp connection配置input buffer。
當input buffer中有資料時,就會通知上層應用程式,onMessage(Buffer*)回撥,根據上層應用層協議判定是否是一個完整的包,即:如果不是一條完整的訊息,就不會取走資料,也不會進行相應的處理;如果是一條完整的訊息,將取走訊息並進行相應的處理。
epoll使用level trigger(LT模式)的原因
① 與poll相容
② LT模式不會發生漏掉事件的BUG,但POLLOUT事件不能一開始就關注,否則會出現busy loop,而應該在write無法完全寫入核心緩衝區的時候才關注,將未寫入核心緩衝區的資料新增到應用層output buffer,直到應用層output buffer寫完,寫完後停止關注POLLOUT事件
③ 讀寫的時候不必等候EAGIN,可以節約系統呼叫的次數,降低延遲。(注:如果用ET模式,讀的時候讀到EAGAIN,寫的時候直到output buffer寫完或者EAGAIN)
二、應用層緩衝區Buffer
1、Buffer的要求
muduo buffer的設計考慮了常見的網路程式設計需求,我試圖在易用性和效能之間找一個平衡點,目前這個平衡點更偏向於易用性。
muduo buffer的設計要點:
(1) 對外表現為一塊連續的記憶體(char* ,len),以方便客戶程式碼的編寫
(2) 其size()可以自動增長,以適應不同大小的訊息。他不是一個fixed size array(即char buf[8192])
(3) 內部以vector of char 來儲存資料,並提供相應的訪問函式
Buffer其實像一個queue,從末尾寫入資料,從頭部讀出資料。即內部維護了下標,分別是:讀入位置、寫入位置。
TcpConnection會有兩個Buffer成員, Buffer inputBuffer_ 、Buffer outputBuffer_;
- input buffer:TcpConnection會從socket讀取資料,然後寫入input buffer(其實這一步是用Buffer::readFd()完成的);客戶程式碼從input buffer讀取資料
- output buffer:客戶端程式碼會把資料寫入output buffer(其實這一步是用TcpConnection::send()完成的);TcpConnection從output buffer讀取資料並寫入到socket。
其實,input、output是針對於客戶端程式碼而言,客戶程式碼從input讀,往output寫。TcpConnection的讀寫正好相反。
2、Buffer類的成員
Buffer的作用就是暫時儲存資料。當向Buffer寫入資料後,Buffer可寫入空間writeable減小,可讀空間readable增大;取走資料後變化相反。
muduo中的Buffer為封裝了的vector。vector為一塊連續空間,且其本身具有自動增長的性質,它的迭代器為原始指標,使用起來較為方便。
①Buffer類只有3個成員變數,分別是buffer_、readerIndex_、writerIndex_
std::vector<char> buffer_; //vector用來替代固定大小的數字
size_t readerIndex_; //讀位置
size_t writerIndex_; //寫位置
kCRLF是一個柔性陣列,存放“\r\n”,可以用來搜尋緩衝區資料中的”\r\n“
②其中,readerIndex_、writerIndex_將整個buffer_分成了3個區域,分別是prependable、writable、readable ,Buffer的結構如下圖:
③readeIndex_是可讀部分的首位置,writerIndex_是可寫部分的首位置。兩個中間就是實際可讀(readable)的元素。前部預留的Prepend部分可以用來向頭部新增內容而不需要記憶體挪動的消耗。下面三個函式能獲得這3個區域的大小:
size_t readableBytes() const //可讀空間,裡面放有快取的資料
{ return writerIndex_ - readerIndex_; }
size_t writableBytes() const //可寫空間大小
{ return buffer_.size() - writerIndex_; }
size_t prependableBytes() const //預留空間大小
{ return readerIndex_; }
3、與3個區域相關的函式
① 建構函式
- kCheapPrepend、kInitialSize分別表示prependable、writable的大小
- 初始時,將readIndex、writeIndex設為kCheapPrepend(8)位置
static const size_t kCheapPrepend = 8; //prependable的大小
static const size_t kInitialSize = 1024; //writable的大小
explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize), // 8+1024
readerIndex_(kCheapPrepend), //8
writerIndex_(kCheapPrepend) //8
{
assert(readableBytes() == 0);
assert(writableBytes() == initialSize);
assert(prependableBytes() == kCheapPrepend);
}
② 獲取readIndex、writerIndex位置
//圖中的readIndex位置
const char* peek() const
{ return begin() + readerIndex_; }
//圖中writerIndex位置
char* beginWrite()
{ return begin() + writerIndex_; }
const char* beginWrite() const
{ return begin() + writerIndex_; }
4、Buffer的操作:讀、寫
-
基本的read-write操作
①當有足夠的空間讀寫時,每次read走、write入資料後,都要將readIndex、writeIndex指標下標後移
②當將資料全部read空時,readIndex、writeIndex指標下標設定為初始位置,即kCheapPrepend(8)
-
內部騰挪
Buffer沒有封裝為一個環,所以當Buffer使用一段時間後,其真正的可寫入空間不一定是size() - writeIndex,因為readIndex和prependable之間可能還有空間。因此當可寫如空間不夠時,有時可以通過把CONTENT向“前移”來達到目的。
void shrink(size_t reserve)
{
Buffer other; //生成臨時物件other(大小為readableBytes()+reserve),交換之後,臨時物件析構掉
//兩種情況
//1.空間不夠resize空間
//2.空間足夠內部騰挪
other.ensureWritableBytes(readableBytes()+reserve);
//將當前空間中readable中的資料全部新增到other空間中,然後再交換swap
other.append(toStringPiece());
swap(other);
}
3. 擴增可寫空間writeable
當writeable不能夠容納足夠多的資料時,要進行動態的緩衝區擴充,即:①重新申請一塊更大的緩衝區 ②舊空間上的資料拷貝到新緩衝區上 ③釋放舊空間
4. 一個小小的創新:prepend空間
4、讀、寫Buffer
[1] 從readable區域中,讀走資料
//讀取len個位元組的資料
void retrieve(size_t len)
{
assert(len <= readableBytes());
if (len < readableBytes())//if 要讀位元組數 < 可讀位元組數
{
readerIndex_ += len; //只是readerIndex_後移len
}
else //if 要讀位元組數 > 可讀位元組數,即將資料全部讀走
{
retrieveAll(); //readerIndex_、writerIndex_位置重置為初始位置(8)
}
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
//取出[readerIndex_,end]之間的資料,end為手動指定的
void retrieveUntil(const char* end);
//返回值:void
void retrieveInt64();
void retrieveInt32();
void retrieveInt16();
void retrieveInt8();
//返回值:intXX_t,即讀到的值
int64_t readInt64();
int32_t readInt32();
int16_t readInt16();
int8_t readInt8();
//取出readable中len位元組的資料,並轉換成string型別,再返回
string retrieveAsString(size_t len);
//取出readable中所有的資料,並轉換成string型別,再返回
string retrieveAllAsString();
//取出readable中所有的資料,並轉換成StringPiece型別,再返回
StringPiece toStringPiece() const;
[2] 將資料寫入到writable區域中
append():把資料追加到writeable區域中,如果空間不夠,會進行makespace
void append(const char* data, size_t len)
{
ensureWritableBytes(len); //確保有可寫空間,空間不夠,就自動分配
std::copy(data, data+len, beginWrite());
hasWritten(len);
}
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len) //如果可寫空間不夠
{
makeSpace(len); //resize或移動資料,使Buffer能容下len大資料
}
assert(writableBytes() >= len);
}
void makeSpace(size_t len) //resize或移動資料,使Buffer能容下len大資料
{
//確保空間是真的不夠,而不是挪動就可以騰出空間
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
// FIXME: move readable data
buffer_.resize(writerIndex_+len);
}
else
{
//內部騰挪就足夠append,那麼就內部騰挪一下。
// move readable data to the front, make space inside buffer
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
std::copy(begin()+readerIndex_, //原來的可讀部分全部copy到Prepend位置,相當於向前挪動,為writeable留出空間
begin()+writerIndex_,
begin()+kCheapPrepend);
readerIndex_ = kCheapPrepend; //更新下標
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}
void append(const StringPiece& str)
{
append(str.data(), str.size());
}
void append(const void* data, size_t len)
{
append(static_cast<const char*>(data), len);
}
void appendInt64(int64_t x)
{
int64_t be64 = sockets::hostToNetwork64(x);
append(&be64, sizeof be64);
}
void appendInt32(int32_t x);
void appendInt16(int16_t x);
void appendInt8(int8_t x);
[3] 向從Prepend區域中,放入資料的函式API:
void prependInt64(int64_t x); //放入x,x大小為8位元組
void prependInt32(int32_t x);
void prependInt16(int16_t x);
void prependInt8(int8_t x);
void prepend(const void* data, size_t len);
5、核心函式 readFd
實現功能:從socket中讀取資料,存放到Buffer中的writeable中
實現細節:因為不知道一次性可以讀多少,因此先在棧上開闢了65536位元組的空間extrabuf,使用readv讀至Buffer_中。①如果Buffer中wriable足夠存放從fd讀到的資料,則讀取完畢;②否則剩餘資料先讀到extrabuf中然後以append的方式追加入Buffer_。
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
//saved an ioctl()/FIONREAD call to tell how much to read
//節省一次ioctl系統呼叫(獲取當前有多少可讀資料)
//為什麼這麼說?因為我們準備了足夠大的extrabuf,那麼就不需要使用ioctl取檢視fd有多少可讀位元組數了
char extrabuf[65536];
//使用iovec分配兩個連續的緩衝區
struct iovec vec[2];
const size_t writable = writableBytes();
//第一塊緩衝區,指向可寫空間begin()+writerIndex_
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
//第二塊緩衝區,指向棧上空間extrabuf
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; //writeable一般小於65536
//返回值n:表示readv讀到的位元組數
const ssize_t n = sockets::readv(fd, vec, iovcnt); //iovcnt=2
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)//writable足夠容納
{
writerIndex_ += n; //writerIndex_後移n
}
else //writable不足夠容納 ==> 資料被接受到了第二塊緩衝區extrabuf,將其append至buffer
{
writerIndex_ = buffer_.size();//更新writerIndex到buffer.size()位置
append(extrabuf, n - writable);//將extrabuf中的資料,再追加到buffer中
}
return n;
}
程式碼中struct iovec vec[2]的圖示: