1. 程式人生 > >【網路元件】應用層緩衝

【網路元件】應用層緩衝

       本節研究應用層緩衝Buffer的實現;

應用層緩衝

說明幾點:

(1)在非阻塞式網路程式設計中,應用層緩衝是必須的;應用層傳送緩衝是必須的,假設TCP在傳送20KB資料,還此時核心此連線的傳送快取只有10KB,那麼還未寫入的10KB資料,我們應該緩衝到outputbuffer,並且註冊POLLOUT事件,等到核心此連線的傳送快取有空閒時,繼續寫入;等到outputbuffer中的資料寫完,應該取消關注POLLOUT事件;

(2)應用層接收緩衝是必須的,TCP連線核心接收快取可能並不完整的接收資料包,假設對方傳送的完整訊息為20KB,此時讀取的資料為10KB,並不構成一個完整的訊息,我們是不是應該在inputbuffer中快取這些10KB資料,等到下次再讀資料後,再繼續判斷是否構成一個完整的20KB資料;

(3)應用層傳送快取和接收快取採用stl中的vector來實現,vector可以動態增長,為了應對增長時迭代器失效的情況,僅僅儲存指向的讀寫索引,  size_t _readIndex,size_t _writeIndex;當_readIndex超過一定的資料位移後將會移動資料至開頭處,這樣可以增大資料可寫的大小;

快取示意圖如下:


應用層傳送緩衝示意圖如下:

說明幾點

(1)初始狀態:傳送緩衝還有30KB資料,此時TCP連線應該關注POLLOUT事件,將連線可讀的資料write到核心的傳送緩衝;當然使用者也可以繼續寫入;

(2)使用者繼續寫入20KB資料,但是為了不改變傳送資料的順序,此時資料只能放入到未傳送資料的後面,注意,使用者寫入資料和核心將可讀資料寫入到傳送緩衝中不可能同時發生,因為它們都在TCP連線所屬的IO執行緒中序列執行的;

(3)POLLOUT事件發生,核心傳送快取假設有了35KB資料可寫,那麼應用層傳送緩衝中將有35KB資料被寫入;

(4)使用者下一次寫入資料,當傳送快取的大小不能容納使用者寫的資料時,首先判斷_readIndex是否已經大於_maxHeadBufferMovSize(預設為100位元組),那麼就要將傳送快取的資料移動到開頭處,這樣會增大可寫緩衝的大小;如果仍不能不能容納使用者寫的資料,此時就將_buffer繼續增長(大小為使用者可寫資料的大小*2);


應用層接收緩衝示意圖如下:

說明幾點

(1)初始狀態:接收緩衝還有30KB資料,此時TCP連線若還關注POLLIN事件,仍有可能將核心的接收快取讀取資料到接收快取;當然使用者也可以繼續讀取資料來處理;

(3)POLLIN事件繼續發生,繼續讀取核心接收快取的20KB放入到應用層的接收緩衝中;

(3)使用者讀取35KB資料來處理具體的業務邏輯;注意,使用者讀取資料和核心寫入到應用層接收緩衝中不可能同時發生,因為它們都在TCP連線所屬的IO執行緒中序列執行;

(4)POLLIN事件繼續發生時,當接收快取的大小不能容納連線可以寫入的資料時,首先判斷_readIndex是否已經大於_maxHeadBufferMovSize(預設為100位元組),那麼就要將接收快取的資料移動到開頭處,這樣會增大連線可寫緩衝的大小;如果仍不能不能容納連線可寫的資料,此時就將_buffer繼續增長(大小為連線可寫資料的大小*2);


Buffer

buffer宣告

class Buffer final
{
public:
  Buffer() :
      _readIndex(0),
      _writeIndex(0)
  {
    _buffer.resize(_normalBufferSize);
  }
  
  void resize(size_t len) {
    _buffer.resize(len);
  }
  
  void appendInt32(int32_t value)
  {
    value = endian::hostToNet32(value);
    append(&value, sizeof value);
  }

  void appendInt16(int16_t value)
  {
    value = endian::hostToNet16(value);
    append(&value, sizeof value);
  }

  void appendInt8(int8_t value)
  {
    append(&value, sizeof value);
  }

  void append(const void* buf, size_t len)
  {
    if (len > avail())
      {
        _expand(len);
      }

    ::memcpy(_beginWrite(), buf, len);
    _writeIndex += len;
  }

  std::string retrieveAllAsString()
  {
    std::string s(beginRead(), used());
    setReadIndex(used());
    return s;
  }

  std::string retrieveString(size_t len)
  {
    if (len > used())
      len = used();
      
    std::string s(beginRead(), len);
    setReadIndex(len);
    return s;
  }
  
  uint32_t retrieveInt32()
  {
    uint32_t value;
    retrieve(&value, sizeof value);
    return endian::netToHost32(value);
  }

  uint16_t retrieveInt16()
  {
    uint16_t value;
    retrieve(&value, sizeof value);
    return endian::netToHost16(value);
  }

  uint8_t retrieveInt8()
  {
    uint8_t value;
    retrieve(&value, sizeof value);
    return value;
  }

   uint32_t peekInt32()
  {
    uint32_t value;
    peek(&value, sizeof value);
    return endian::netToHost32(value);
  }
  
  size_t retrieve(void* buf, size_t len)
  {
    if (len > used())
      len = used();

    ::memcpy(buf, beginRead(), len);
    _readIndex += len;

    return len;
  }

  size_t peek(void* buf, size_t len)
  {
    if (len > used())
      len = used();

    ::memcpy(buf, beginRead(), len);

    return len;
  }
  
  size_t avail() const
  {
    assert(_buffer.size() >= _writeIndex);
    return _buffer.size() - _writeIndex;
  }

  size_t used() const
  {
    assert(_writeIndex >= _readIndex);
    return  _writeIndex - _readIndex;
  }

  ssize_t readFd(int connfd);

  const char* beginRead() const
  {
    return _begin() + _readIndex;
  }

  char* beginRead()
  {
    return _begin() + _readIndex;
  }

  void setReadIndex(size_t len)
  {
    _readIndex += len;
  }

  size_t readIndex() const {
    return _readIndex;
  }
  
  size_t writeIndex() const {
    return _writeIndex;
  }
  
  void reset() {
    _readIndex = 0;
    _writeIndex = 0;
  }
  
private:
  void _setWriteIndex(size_t len)
  {
    _writeIndex += len;
  }

  void _expand(size_t len)
  {
    assert(_writeIndex >= _readIndex);

    if (_readIndex > _maxHeadBufferMovSize)
      {
        ::memcpy(_begin(), beginRead(), used());
        _writeIndex = used();
        _readIndex = 0;
      }

    if (len > avail())
      {
        _buffer.resize(_buffer.size() + len);
      }
  }

  const char* _begin() const
  {
    return &(*_buffer.begin());
  }

  char* _begin()
  {
    return &(*_buffer.begin());
  }


  const char* _beginWrite() const
  {
    return _begin() + _writeIndex;
  }

  char* _beginWrite()
  {
    return _begin() + _writeIndex;
  }

  size_t _readIndex;
  size_t _writeIndex;

  std::vector<char> _buffer;
  
  static const size_t _maxHeadBufferMovSize = 20;
  static const size_t _normalBufferSize = 20;
};
說明幾點:

(1)append系列函式為寫資料到緩衝中,此時有可能會執行_expand(size_t len),來調整_readIndex,超過_maxHeadBufferMovSize後將會移動資料至開頭處,這樣可以增大資料可寫的大小;

(2)retrieve系列函式為從緩衝中讀取相關資料來處理具體業務邏輯;主要用於接收緩衝的處理;對於傳送緩衝,是直接正對傳送快取本身的_readIndex來處理的;

(3)peek系列函式僅僅是看看緩衝池的資料,並不改變緩衝池的任何狀態;

連線讀函式

ssize_t Buffer::readFd(int connfd)
{
  char extraBuf[64 * 1024];   //use memory in function stack

  int remain =  avail();

  struct iovec iov[2];
  iov[0].iov_base = _beginWrite();
  iov[0].iov_len = remain;

  iov[1].iov_base = extraBuf;
  iov[1].iov_len = sizeof extraBuf;

  ssize_t len = sockets::readv(connfd, iov, 2);   //len can be 0 or -1
  
  if (len > remain)
    {
      _setWriteIndex(remain);

      append(extraBuf, len - remain);
    }
  else if (len > 0)
    {
      _setWriteIndex(len);
    }

  return len;
}
說明幾點:

(1)讀資料時,我們並不知道讀取的資料為多大大小,假設盲目擴大應用層接收緩衝池的可寫區域的大小,可能核心的接收快取只有少量資料;我們充分利用棧上的64KB快取,利用readv函式,將核心的接收快取分別讀取到兩塊記憶體上(第一塊為應用層接收快取,第二塊為棧上的64KB記憶體),當第二塊(棧上的64KB記憶體)記憶體上有資料時,我們再將這些棧上讀取的資料放入到應用層的接收快取中;