muduo原始碼解析22-buffer類
buffer類:
說明:
一個buffer類,400行的一個類要看吐了,不過還好有很多相似功能的成員函式
參考:https://blog.csdn.net/Shreck66/article/details/49618331
先說一下buffer的目的作用:
在這裡僅考慮 非阻塞 套接字模式,阻塞模式在高併發伺服器情況下嚴重影響併發性,不考慮.
通常非阻塞套接字模式要搭配IO複用一起使用,否則為實現邏輯,也需要不斷迴圈來進行IO
(這裡以epoll為例子)我們知道epoll_wait會返回發生了網路事件的套接字集合以及相應的網路事件名字
正常的情況是:
epoll_wait返回,
EPOLLIN:呼叫read讀取sockfd上的資料或者接受一個連線
在高併發情況下會帶來一些問題:
問題1.當我們要傳送較大的資料時,我們正常想法是方法一,一直write直到所有資料都發了過去,可是事實情況可能是
作業系統核心非常繁忙無法處理write呼叫,導致一直進行write迴圈,
解決方法一:
epoll_wait() //返回發生網路事件的套接字
|
EPOLLOUT //處理寫事件
|
while(unfinished)write() //核心繁忙,write一直失敗,死迴圈,其他的套接字得不到響應
解決方法二:利用寫緩衝區
epoll_wait()
|
EPOLLOUT
write to buffer //把核心無法接受的資料先放在buffer中
|
註冊EPOLLOUT //註冊EPOLLOUT事件,下次epoll_wait()返回時從buffer中拿資料write
很明顯方法二不需要我們關係write能不能成功,我們只需要把資料放到buffer中,在註冊一個EPOLLOUT事件
作業系統可write時,直接從buffer中拿資料write即可.
優點:併發性優於方法一
buffer用於應用層向套接字進行讀寫資料所使用的中間的緩衝區
不使用緩衝區會有什麼問題?
問題2:我們在處理可讀事件時,必須要一次把資料讀完,否則epoll_wait()會一直觸發返回(LT模式),
方案一:粘包/包不完整:
epoll_wait()
|
EPOLLIN
|
read to buf //buf中第一個資料包和第二個資料包會連在一起,形成粘包,也有可能包不完整
|
process //包有問題,無法處理
方案二:利用讀緩衝區
epoll_wait()
|
EPOLLIN
|
read to buffer //先全部讀到buffer中
|
buffer notify() //buffer中有完整的一個包了,就通知業務程式處理
|
process //處理資料
因此可以看出來,buffer主要分為 寫緩衝區 和 讀緩衝區
首先來看一下buffer設計的要點
.對外表現為一塊連續的記憶體
.其大小可自動增長,以適應不斷增長的訊息,它不能是固定大小的buffer
.內部以std::vector來儲存資料,並提供相應的訪問函式
.input buffer,是程式從socket中讀取資料然後寫入input buffer,客戶程式碼從input buffer讀取資料
.output buffer,客戶程式碼會把資料寫入output buffer中,然後再從output buffer中讀取資料並寫入socket中
buffer.h:
#ifndef BUFFER_H #define BUFFER_H #include"base/copyable.h" #include"base/types.h" #include"net/endian.h" #include<algorithm> #include<vector> #include<assert.h> #include<string.h> //#include<unistd.h> //ssize_t namespace mymuduo { namespace net { ///緩衝區類模仿 org.jboss.netty.buffer.ChannelBuffer /// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer /// /// @code /// +-------------------+------------------+------------------+ /// | prependable bytes | readable bytes | writable bytes | /// | 預留的空間 | 資料(CONTENT) | 可寫的空間 | /// +-------------------+------------------+------------------+ /// | | | | /// 0 <= readerIndex <= writerIndex <= size /// 開頭 可讀位置 可寫位置 結尾 /// @endcode /// /// 注意新增資料時,如果剩餘空間可用,直接新增,否則若是剩餘空間+預留空間可用, /// 把資料挪動到kCheapPrepend位置即可,否則就需要額外擴容了 /// /// class buffer:copyable { public: //總空間1024+8=1032 static const size_t kCheapPrepend=8; //一開始預留空間為8位元組 static const size_t kInitialSize=1024; //一開始初始空間1024 //explicit 禁止隱式轉換 //readIndex和writerIndex初始都在kCheaperPrepend(8)處,m_buffer大小(1032) explicit buffer(size_t initialSize=kInitialSize) :m_buffer(kCheapPrepend+initialSize), m_readerIndex(kCheapPrepend), m_writerIndex(kCheapPrepend) { assert(readableBytes()==0); assert(writableBytes()==initialSize); assert(prependableBytes()==kCheapPrepend); } //交換兩個buffer類,實際上完成m_buffer,m_readerIndex,m_writerbuffer的交換 void swap(buffer& rhs) { m_buffer.swap(rhs.m_buffer); std::swap(m_readerIndex,rhs.m_readerIndex); std::swap(m_writerIndex,rhs.m_writerIndex); } //當前buffer可讀的位元組數,從可讀位置到可寫位置 size_t readableBytes() const { return m_writerIndex-m_readerIndex; } //當前buffer可寫的位元組數,從可寫位置到尾處 size_t writableBytes() const { return m_buffer.size()-m_writerIndex; } //當前預留的位元組數,從0到可讀位置 size_t prependableBytes() const { return m_readerIndex; } //得到可讀位置的地址 const char* peek() const { return begin() + m_readerIndex; } //peek()是可讀位置m_readerIndex的地址,beginWrite()是可寫位置m_writerIndex的地址 //左閉右開區間[peek(),beginWrite()),在此區間中找,得到[kCRLF,kCRLF+2)的地址 const char* findCRLF() const { //const char Buffer::kCRLF[] = "\r\n"; const char* crlf=std::search(peek(),beginWrite(),kCRLF,kCRLF+2); return crlf==beginWrite()?NULL:crlf; } //在[start,beginWrite())中找到第一個"\r\n"的位置 const char* findCRLF(const char* start) const { assert(peek()<=start); assert(start<=beginWrite()); const char* crlf=std::search(start,beginWrite(),kCRLF,kCRLF+2); return crlf==begin()?NULL:crlf; } //找到換行符'\n'的地址 const char* findEOL() const { //從peek()所指區域的前readbleBytes()個字元中找到'\n'; const void* eol=memchr(peek(),'\n',readableBytes()); return static_cast<const char*>(eol); } //從start位置開始,找到換行符'\n'的地址 const char* findEOL(const char* start) const { assert(peek()<=start); assert(start<=beginWrite()); const void* eol=memchr(start,'\n',beginWrite()-start); return static_cast<const char*>(eol); } //從peek()可讀位置開始,讓m_readerIndex移動len個位置 void retrieve(size_t len) { assert(len<=readableBytes()); if(len<readableBytes()) m_readerIndex+=len; else retrieveAll(); } //從peek()可讀位置開始,讓m_readerIndex移動知道end為止 void retrieveUntil(const char* end) { assert(peek()<=end); assert(end<=beginWrite()); retrieve(end-peek()); } //讓m_readerIndex移動sizeof(int_*)個位置 void retrieveInt64() { retrieve(sizeof(int64_t)); } void retrieveInt32() { retrieve(sizeof(int32_t)); } void retrieveInt16() { retrieve(sizeof(int16_t)); } void retrieveInt8() { retrieve(sizeof(int8_t)); } //讓讀位置和寫位置都回到kCheapPrepend void retrieveAll() { m_readerIndex=kCheapPrepend; m_writerIndex=kCheapPrepend; } //讀取len個位元組的資料並移動m_readerIndex string retrieveAsString(size_t len) { assert(len<=readableBytes()); string result(peek(),len); retrieve(len); return result; } //讀取剩餘所有資料並移動m_readerIndex string retrieveAllAsString() { return retrieveAsString(readableBytes()); } //本來是StringPiece toStringPiece() const,我放棄了StringPiece //讀取剩餘所有的資料並移動m_readerIndex string toString() const { return string(peek(),static_cast<int>(readableBytes())); } void append(const char* data,size_t len) { //擴容/挪動資料,保證剩餘空間足夠len位元組 ensureWritableBytes(len); //新增資料 std::copy(data,data+len,beginWrite()); //更新m_writerIndex hasWritten(len); } // void append(const string& str) { append(str.data(),str.size()); } void append(const void* data,size_t len) { append(static_cast<const char*>(data),len); } //剩餘空間不足len時,對m_buffer進行擴容或者挪動資料,保證能寫len位元組的資料, void ensureWritableBytes(size_t len) { //擴容/挪動資料 if(writableBytes()<len) makeSpace(len); assert(writableBytes()>=len); } //返回可寫位置的地址 char* beginWrite() { return begin()+m_writerIndex; } const char* beginWrite() const { return begin()+m_writerIndex; } //更新m_writerIndex + len void hasWritten(size_t len) { assert(len<=writableBytes()); m_writerIndex+=len; } //m_writerIndex往前移動len個位元組 void unwrite(size_t len) { assert(len<=readableBytes()); m_writerIndex-=len; } //向m_buffer中寫網路位元組序int_*,改變m_writerIndex位置 void appendInt64(int64_t x) { int64_t be64=sockets::hostToNetwork64(x); append(&be64,sizeof(be64)); } void appendInt32(int32_t x) { int32_t be32 = sockets::hostToNetwork32(x); append(&be32, sizeof be32); } void appendInt16(int16_t x) { int16_t be16 = sockets::hostToNetwork16(x); append(&be16, sizeof be16); } void appendInt8(int8_t x) { append(&x, sizeof x); } //從m_buffer中可讀位置返回第一個in64_*的網路位元組序,不會改變m_readerIndex位置 int64_t peekInt64() const { assert(readableBytes() >= sizeof(int64_t)); int64_t be64 = 0; ::memcpy(&be64, peek(), sizeof be64); return sockets::networkToHost64(be64); } int32_t peekInt32() const { assert(readableBytes() >= sizeof(int32_t)); int32_t be32 = 0; ::memcpy(&be32, peek(), sizeof be32); return sockets::networkToHost32(be32); } int16_t peekInt16() const { assert(readableBytes() >= sizeof(int16_t)); int16_t be16 = 0; ::memcpy(&be16, peek(), sizeof be16); return sockets::networkToHost16(be16); } int8_t peekInt8() const { assert(readableBytes() >= sizeof(int8_t)); int8_t x = *peek(); return x; } //從m_buffer中讀取一個int64_t,返回主機位元組序,會移動m_readerIndex int64_t readInt64() { //獲取8個位元組的資料,返回主機位元組序 int64_t result=peekInt64(); retrieveInt64(); //移動m_readerIndex; return result; } int32_t readInt32() { //獲取4個位元組的資料,返回主機位元組序 int32_t result=peekInt32(); retrieveInt32(); //移動m_readerIndex; return result; } int16_t readInt16() { int16_t result=peekInt16(); retrieveInt16(); return result; } int8_t readInt8() { int8_t result=peekInt8(); retrieveInt8(); return result; } //向預留空間中寫入資料,從m_readerIndex向前也就是向左拷貝data void prepend(const void* data,size_t len) { assert(len<=prependableBytes()); m_readerIndex-=len; const char* d=static_cast<const char*>(data); std::copy(d,d+len,begin()+m_readerIndex); } void prependInt64(int64_t x) { //獲取網路位元組序 int64_t be64=sockets::hostToNetwork64(x); prepend(&be64,sizeof(be64)); //向預留空間中寫入8個位元組的資料 } //同上,只是位元組數稍有變化 void prependInt32(int32_t x) { int32_t be32 = sockets::hostToNetwork32(x); prepend(&be32, sizeof be32); } void prependInt16(int16_t x) { int16_t be16 = sockets::hostToNetwork16(x); prepend(&be16, sizeof be16); } void prependInt8(int8_t x) { prepend(&x, sizeof x); } //把資料讀到另一個m_buffer裡,並進行交換 void shrink(size_t reserve) { buffer other; //保證可用空間足夠,擴容/挪動資料 other.ensureWritableBytes(readableBytes()+reserve); other.append(toString()); swap(other); } //得到m_buffer的已分配空間大小 size_t internalCapacity() const { return m_buffer.capacity(); } ssize_t readFd(int fd,int* savedErrno); private: //初始位置的地址 char* begin() { return &*m_buffer.begin(); } //初始位置的地址 const char* begin() const { return &*m_buffer.begin(); } //向m_buffer中新增資料時,可能會引起m_buffer擴容/資料挪動操作 void makeSpace(size_t len) { //預留空間+剩餘可寫空間 還是不夠 len+8,就擴容 if(writableBytes()+prependableBytes()<len+kCheapPrepend) m_buffer.resize(m_writerIndex+len); else { //只需要整理空間即可,把資料往前挪動預留空間個位置 assert(kCheapPrepend<m_readerIndex); size_t readable=readableBytes(); //把peek()到beginWrite()之間的資料移動到begin()+kCheapPrepend位置處 std::copy(begin()+m_readerIndex, begin()+m_writerIndex, begin()+kCheapPrepend); //更新可讀可寫下標 m_readerIndex=kCheapPrepend; m_writerIndex=m_readerIndex+readable; assert(readable==readableBytes()); } } private: std::vector<char> m_buffer; //儲存緩衝區資料 size_t m_readerIndex; //讀下標 size_t m_writerIndex; //寫下標 static const char kCRLF[]; }; }//namespace net }//namespace mymuduo #endif // BUFFER_H
buffer.cpp
#include "buffer.h" #include"net/socketsops.h" #include<errno.h> #include<sys/uio.h> namespace mymuduo { namespace net{ const char buffer::kCRLF[]="\r\n"; const size_t buffer::kInitialSize; const size_t buffer::kCheapPrepend; //在m_buffer中獲取 ssize_t buffer::readFd(int fd, int *savedErrno) { //棧空間,用於從套接字中讀資料時,m_buffer不夠用時暫存資料,等到m_buffer足夠時拷貝到m_buffer char extrabuf[65536]; struct iovec vec[2]; const size_t writable=writableBytes(); vec[0].iov_base=begin()+m_writerIndex; vec[0].iov_len=writable; vec[1].iov_base=extrabuf; vec[1].iov_len=sizeof(extrabuf); //m_buffer不夠用時用extrabuf const int iovcnt=(writable<sizeof(extrabuf))?2:1; const ssize_t n= sockets::readv(fd,vec,iovcnt); if(n<0) *savedErrno=errno; else if(implicit_cast<size_t>(n)<=writable) m_writerIndex+=n; else { m_writerIndex=m_buffer.size(); //把額外的extra部分新增到m_buffer中 append(extrabuf,n-writable); } return n; } }//namespace net }//namespace mymuduo
測試:
#include "net/buffer.h" #include<iostream> using namespace std; using mymuduo::string; using mymuduo::net::buffer; /* 簡單測試一下buffer類對與字串和int*型別的讀寫操作把 */ void test_string() { //初始時,m_readerIndex為8(kCheapPrepend),m_writerIndex為8 buffer buf; assert(buf.readableBytes()==0); assert(buf.writableBytes()==buffer::kInitialSize); assert(buf.prependableBytes()==buffer::kCheapPrepend); //寫200位元組進去,m_readerIndex為8,m_writerIndex為208 const string str(200,'x'); buf.append(str); assert(buf.readableBytes()==200); assert(buf.writableBytes()==1024-200); assert(buf.prependableBytes()==8); //讀50位元組出來,readIndex:8+50,writeIndex:208 string str2=buf.retrieveAsString(50); assert(str2.size()==50); assert(buf.readableBytes()==150); assert(buf.writableBytes()==1024-200); assert(buf.prependableBytes()==8+50); //新增1000位元組進去,肯定超出空間大小,肯定會擴容m_buffer buf.append(string(1000,'y')); std::cout<<buf.readableBytes()<<" "<<buf.writableBytes()<<" " <<buf.prependableBytes()<<std::endl; } void test_int() { //初始時,m_readerIndex為8(kCheapPrepend),m_writerIndex為8 buffer buf; assert(buf.readableBytes()==0); assert(buf.writableBytes()==buffer::kInitialSize); assert(buf.prependableBytes()==buffer::kCheapPrepend); //在m_buffer內部int*用網路位元組序儲存 buf.appendInt64(static_cast<int64_t>(123456)); buf.appendInt32(static_cast<int32_t>(12345)); buf.appendInt16(static_cast<int16_t>(1234)); buf.appendInt8(static_cast<int8_t>(123)); assert(buf.readableBytes()==8+4+2+1); //取出int*返回主機位元組序 int64_t a=buf.readInt64(); int64_t b=buf.readInt32(); int64_t c=buf.readInt16(); int64_t d=buf.readInt8(); assert(buf.readableBytes()==0); cout<<a<<" "<<b<<" "<<c<<" "<<d<<endl; } int main() { test_int(); }
測試結果:
123456 12345 1234 123
例子很簡單,但是buffer內部實現需要注意,int*型別再m_buffer內部儲存方式是網路位元組序(大端法),而讀出的int*是主機位元組序(小端法)
整個buffer圍繞著內部成員m_buffer進行操作,重要的成員就是兩個下標,讀下標和寫下標,只要搞明白了這兩個下標,基本上就是網m_buffer裡面拷貝/刪除資料的問題了。