1. 程式人生 > 實用技巧 >muduo原始碼解析22-buffer類

muduo原始碼解析22-buffer類

buffer類:

說明:

一個buffer類,400行的一個類要看吐了,不過還好有很多相似功能的成員函式
參考:https://blog.csdn.net/Shreck66/article/details/49618331

先說一下buffer的目的作用:

在這裡僅考慮 非阻塞 套接字模式,阻塞模式在高併發伺服器情況下嚴重影響併發性,不考慮.
通常非阻塞套接字模式要搭配IO複用一起使用,否則為實現邏輯,也需要不斷迴圈來進行IO
(這裡以epoll為例子)我們知道epoll_wait會返回發生了網路事件的套接字集合以及相應的網路事件名字


正常的情況是:
epoll_wait返回,
EPOLLIN:呼叫read讀取sockfd上的資料或者接受一個連線

EPOLLOUT:在sockfd上呼叫write傳送資料

高併發情況下會帶來一些問題:
問題1.當我們要傳送較大的資料時,我們正常想法是方法一,一直write直到所有資料都發了過去,可是事實情況可能是
作業系統核心非常繁忙無法處理write呼叫,導致一直進行write迴圈,

解決方法一:
epoll_wait() //返回發生網路事件的套接字
  |
EPOLLOUT //處理寫事件
  |
while(unfinished)write() //核心繁忙,write一直失敗,死迴圈,其他的套接字得不到響應

解決方法二:利用寫緩衝區
epoll_wait()
  |
EPOLLOUT

  |
write to buffer //把核心無法接受的資料先放在buffer中
  |
註冊EPOLLOUT //註冊EPOLLOUT事件,下次epoll_wait()返回時從buffer中拿資料write

很明顯方法二不需要我們關係write能不能成功,我們只需要把資料放到buffer中,在註冊一個EPOLLOUT事件
作業系統可write時,直接從buffer中拿資料write即可.
優點:併發性優於方法一

buffer用於應用層向套接字進行讀寫資料所使用的中間的緩衝區
不使用緩衝區會有什麼問題?

問題2:我們在處理可讀事件時,必須要一次把資料讀完,否則epoll_wait()會一直觸發返回(LT模式),

造成主迴圈busy-loop,但是當我們一次把資料全部讀取完畢,無法保證訊息的完整性(粘包/不完整問題).

方案一:粘包/包不完整:
epoll_wait()
  |
EPOLLIN
  |
read to buf //buf中第一個資料包和第二個資料包會連在一起,形成粘包,也有可能包不完整
  |
process //包有問題,無法處理

方案二:利用讀緩衝區
epoll_wait()
  |
EPOLLIN
  |
read to buffer //先全部讀到buffer中
  |
buffer notify() //buffer中有完整的一個包了,就通知業務程式處理
  |
process //處理資料

因此可以看出來,buffer主要分為 寫緩衝區 和 讀緩衝區

首先來看一下buffer設計的要點
.對外表現為一塊連續的記憶體
.其大小可自動增長,以適應不斷增長的訊息,它不能是固定大小的buffer
.內部以std::vector來儲存資料,並提供相應的訪問函式
.input buffer,是程式從socket中讀取資料然後寫入input buffer,客戶程式碼從input buffer讀取資料
.output buffer,客戶程式碼會把資料寫入output buffer中,然後再從output buffer中讀取資料並寫入socket中

buffer.h:

#ifndef BUFFER_H
#define BUFFER_H

#include"base/copyable.h"
#include"base/types.h"

#include"net/endian.h"

#include<algorithm>
#include<vector>
#include<assert.h>
#include<string.h>
//#include<unistd.h>    //ssize_t


namespace mymuduo {
namespace net {

///緩衝區類模仿 org.jboss.netty.buffer.ChannelBuffer
/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
///
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes |  readable bytes  |  writable bytes  |
/// |    預留的空間       | 資料(CONTENT)    | 可寫的空間         |
/// +-------------------+------------------+------------------+
/// |                   |                  |                  |
/// 0      <=      readerIndex   <=   writerIndex    <=     size
/// 開頭               可讀位置          可寫位置               結尾
/// @endcode
///
/// 注意新增資料時,如果剩餘空間可用,直接新增,否則若是剩餘空間+預留空間可用,
/// 把資料挪動到kCheapPrepend位置即可,否則就需要額外擴容了
///
///

class buffer:copyable
{
public:
    //總空間1024+8=1032
    static const size_t kCheapPrepend=8;    //一開始預留空間為8位元組
    static const size_t kInitialSize=1024;  //一開始初始空間1024

    //explicit 禁止隱式轉換
    //readIndex和writerIndex初始都在kCheaperPrepend(8)處,m_buffer大小(1032)
    explicit buffer(size_t initialSize=kInitialSize)
        :m_buffer(kCheapPrepend+initialSize),
          m_readerIndex(kCheapPrepend),
          m_writerIndex(kCheapPrepend)
    {
        assert(readableBytes()==0);
        assert(writableBytes()==initialSize);
        assert(prependableBytes()==kCheapPrepend);
    }

    //交換兩個buffer類,實際上完成m_buffer,m_readerIndex,m_writerbuffer的交換
    void swap(buffer& rhs)
    {
        m_buffer.swap(rhs.m_buffer);
        std::swap(m_readerIndex,rhs.m_readerIndex);
        std::swap(m_writerIndex,rhs.m_writerIndex);
    }

    //當前buffer可讀的位元組數,從可讀位置到可寫位置
    size_t readableBytes() const
    {
        return m_writerIndex-m_readerIndex;
    }

    //當前buffer可寫的位元組數,從可寫位置到尾處
    size_t writableBytes() const
    {
        return m_buffer.size()-m_writerIndex;
    }

    //當前預留的位元組數,從0到可讀位置
    size_t prependableBytes() const
    {
        return m_readerIndex;
    }

    //得到可讀位置的地址
    const char* peek() const
    {
        return begin() + m_readerIndex;
    }

    //peek()是可讀位置m_readerIndex的地址,beginWrite()是可寫位置m_writerIndex的地址
    //左閉右開區間[peek(),beginWrite()),在此區間中找,得到[kCRLF,kCRLF+2)的地址
    const char* findCRLF() const
    {
        //const char Buffer::kCRLF[] = "\r\n";
        const char* crlf=std::search(peek(),beginWrite(),kCRLF,kCRLF+2);
        return crlf==beginWrite()?NULL:crlf;
    }

    //在[start,beginWrite())中找到第一個"\r\n"的位置
    const char* findCRLF(const char* start) const
    {
        assert(peek()<=start);
        assert(start<=beginWrite());
        const char* crlf=std::search(start,beginWrite(),kCRLF,kCRLF+2);
        return crlf==begin()?NULL:crlf;

    }

    //找到換行符'\n'的地址
    const char* findEOL() const
    {
        //從peek()所指區域的前readbleBytes()個字元中找到'\n';
        const void* eol=memchr(peek(),'\n',readableBytes());
        return static_cast<const char*>(eol);
    }

    //從start位置開始,找到換行符'\n'的地址
    const char* findEOL(const char* start) const
    {
        assert(peek()<=start);
        assert(start<=beginWrite());
        const void* eol=memchr(start,'\n',beginWrite()-start);
        return static_cast<const char*>(eol);
    }

    //從peek()可讀位置開始,讓m_readerIndex移動len個位置
    void retrieve(size_t len)
    {
        assert(len<=readableBytes());
        if(len<readableBytes())
            m_readerIndex+=len;
        else
            retrieveAll();
    }

    //從peek()可讀位置開始,讓m_readerIndex移動知道end為止
    void retrieveUntil(const char* end)
    {
        assert(peek()<=end);
        assert(end<=beginWrite());
        retrieve(end-peek());
    }

    //讓m_readerIndex移動sizeof(int_*)個位置
    void retrieveInt64()
    {
        retrieve(sizeof(int64_t));
    }

    void retrieveInt32()
    {
        retrieve(sizeof(int32_t));
    }

    void retrieveInt16()
    {
        retrieve(sizeof(int16_t));
    }

    void retrieveInt8()
    {
        retrieve(sizeof(int8_t));
    }

    //讓讀位置和寫位置都回到kCheapPrepend
    void retrieveAll()
    {
        m_readerIndex=kCheapPrepend;
        m_writerIndex=kCheapPrepend;
    }

    //讀取len個位元組的資料並移動m_readerIndex
    string retrieveAsString(size_t len)
    {
        assert(len<=readableBytes());
        string result(peek(),len);
        retrieve(len);
        return result;
    }

    //讀取剩餘所有資料並移動m_readerIndex
    string retrieveAllAsString()
    {
        return retrieveAsString(readableBytes());
    }

    //本來是StringPiece toStringPiece() const,我放棄了StringPiece
    //讀取剩餘所有的資料並移動m_readerIndex
    string toString() const
    {
        return string(peek(),static_cast<int>(readableBytes()));
    }

    void append(const char* data,size_t len)
    {
        //擴容/挪動資料,保證剩餘空間足夠len位元組
        ensureWritableBytes(len);
        //新增資料
        std::copy(data,data+len,beginWrite());
        //更新m_writerIndex
        hasWritten(len);
    }

    //
    void append(const string& str)
    {
        append(str.data(),str.size());
    }

    void append(const void* data,size_t len)
    {
        append(static_cast<const char*>(data),len);
    }

    //剩餘空間不足len時,對m_buffer進行擴容或者挪動資料,保證能寫len位元組的資料,
    void ensureWritableBytes(size_t len)
    {
        //擴容/挪動資料
        if(writableBytes()<len)
            makeSpace(len);
        assert(writableBytes()>=len);
    }

    //返回可寫位置的地址
    char* beginWrite()
    {
        return begin()+m_writerIndex;
    }

    const char* beginWrite() const
    {
        return begin()+m_writerIndex;
    }

    //更新m_writerIndex + len
    void hasWritten(size_t len)
    {
        assert(len<=writableBytes());
        m_writerIndex+=len;
    }

    //m_writerIndex往前移動len個位元組
    void unwrite(size_t len)
    {
        assert(len<=readableBytes());
        m_writerIndex-=len;
    }

    //向m_buffer中寫網路位元組序int_*,改變m_writerIndex位置
    void appendInt64(int64_t x)
    {
        int64_t be64=sockets::hostToNetwork64(x);
        append(&be64,sizeof(be64));
    }

    void appendInt32(int32_t x)
    {
      int32_t be32 = sockets::hostToNetwork32(x);
      append(&be32, sizeof be32);
    }

    void appendInt16(int16_t x)
    {
      int16_t be16 = sockets::hostToNetwork16(x);
      append(&be16, sizeof be16);
    }

    void appendInt8(int8_t x)
    {
      append(&x, sizeof x);
    }

    //從m_buffer中可讀位置返回第一個in64_*的網路位元組序,不會改變m_readerIndex位置
    int64_t peekInt64() const
    {
      assert(readableBytes() >= sizeof(int64_t));
      int64_t be64 = 0;
      ::memcpy(&be64, peek(), sizeof be64);
      return sockets::networkToHost64(be64);
    }

    int32_t peekInt32() const
    {
      assert(readableBytes() >= sizeof(int32_t));
      int32_t be32 = 0;
      ::memcpy(&be32, peek(), sizeof be32);
      return sockets::networkToHost32(be32);
    }

    int16_t peekInt16() const
    {
      assert(readableBytes() >= sizeof(int16_t));
      int16_t be16 = 0;
      ::memcpy(&be16, peek(), sizeof be16);
      return sockets::networkToHost16(be16);
    }

    int8_t peekInt8() const
    {
      assert(readableBytes() >= sizeof(int8_t));
      int8_t x = *peek();
      return x;
    }

    //從m_buffer中讀取一個int64_t,返回主機位元組序,會移動m_readerIndex
    int64_t readInt64()
    {
        //獲取8個位元組的資料,返回主機位元組序
        int64_t result=peekInt64();
        retrieveInt64();    //移動m_readerIndex;
        return result;
    }

    int32_t readInt32()
    {
        //獲取4個位元組的資料,返回主機位元組序
        int32_t result=peekInt32();
        retrieveInt32();    //移動m_readerIndex;
        return result;
    }

    int16_t readInt16()
    {
        int16_t result=peekInt16();
        retrieveInt16();
        return result;
    }

    int8_t readInt8()
    {
        int8_t result=peekInt8();
        retrieveInt8();
        return result;
    }

    //向預留空間中寫入資料,從m_readerIndex向前也就是向左拷貝data
    void prepend(const void* data,size_t len)
    {
        assert(len<=prependableBytes());
        m_readerIndex-=len;
        const char* d=static_cast<const char*>(data);
        std::copy(d,d+len,begin()+m_readerIndex);
    }

    void prependInt64(int64_t x)
    {
        //獲取網路位元組序
        int64_t be64=sockets::hostToNetwork64(x);
        prepend(&be64,sizeof(be64));   //向預留空間中寫入8個位元組的資料
    }
    //同上,只是位元組數稍有變化
    void prependInt32(int32_t x)
    {
      int32_t be32 = sockets::hostToNetwork32(x);
      prepend(&be32, sizeof be32);
    }

    void prependInt16(int16_t x)
    {
      int16_t be16 = sockets::hostToNetwork16(x);
      prepend(&be16, sizeof be16);
    }

    void prependInt8(int8_t x)
    {
      prepend(&x, sizeof x);
    }

    //把資料讀到另一個m_buffer裡,並進行交換
    void shrink(size_t reserve)
    {
        buffer other;
        //保證可用空間足夠,擴容/挪動資料
        other.ensureWritableBytes(readableBytes()+reserve);
        other.append(toString());
        swap(other);
    }

    //得到m_buffer的已分配空間大小
    size_t internalCapacity() const
    {
        return m_buffer.capacity();
    }

    ssize_t readFd(int fd,int* savedErrno);

private:
    //初始位置的地址
    char* begin()
    {
        return &*m_buffer.begin();
    }

    //初始位置的地址
    const char* begin() const
    {
        return &*m_buffer.begin();
    }

    //向m_buffer中新增資料時,可能會引起m_buffer擴容/資料挪動操作
    void makeSpace(size_t len)
    {
        //預留空間+剩餘可寫空間  還是不夠 len+8,就擴容
        if(writableBytes()+prependableBytes()<len+kCheapPrepend)
            m_buffer.resize(m_writerIndex+len);
        else
        {
            //只需要整理空間即可,把資料往前挪動預留空間個位置
            assert(kCheapPrepend<m_readerIndex);
            size_t readable=readableBytes();
            //把peek()到beginWrite()之間的資料移動到begin()+kCheapPrepend位置處
            std::copy(begin()+m_readerIndex,
                      begin()+m_writerIndex,
                      begin()+kCheapPrepend);
            //更新可讀可寫下標
            m_readerIndex=kCheapPrepend;
            m_writerIndex=m_readerIndex+readable;
            assert(readable==readableBytes());
        }
    }

private:
    std::vector<char> m_buffer; //儲存緩衝區資料
    size_t m_readerIndex;       //讀下標
    size_t m_writerIndex;       //寫下標

    static const char kCRLF[];

};


}//namespace net

}//namespace mymuduo



#endif // BUFFER_H

buffer.cpp

#include "buffer.h"

#include"net/socketsops.h"
#include<errno.h>
#include<sys/uio.h>

namespace mymuduo {

namespace net{

const char buffer::kCRLF[]="\r\n";
const size_t buffer::kInitialSize;
const size_t buffer::kCheapPrepend;

//在m_buffer中獲取
ssize_t buffer::readFd(int fd, int *savedErrno)
{
    //棧空間,用於從套接字中讀資料時,m_buffer不夠用時暫存資料,等到m_buffer足夠時拷貝到m_buffer
    char extrabuf[65536];
    struct iovec vec[2];
    const size_t writable=writableBytes();
    vec[0].iov_base=begin()+m_writerIndex;
    vec[0].iov_len=writable;
    vec[1].iov_base=extrabuf;
    vec[1].iov_len=sizeof(extrabuf);

    //m_buffer不夠用時用extrabuf
    const int iovcnt=(writable<sizeof(extrabuf))?2:1;
    const ssize_t n= sockets::readv(fd,vec,iovcnt);
    if(n<0)
        *savedErrno=errno;
    else if(implicit_cast<size_t>(n)<=writable)
        m_writerIndex+=n;
    else
    {
        m_writerIndex=m_buffer.size();
        //把額外的extra部分新增到m_buffer中
        append(extrabuf,n-writable);
    }

    return n;

}

}//namespace net

}//namespace mymuduo

測試:

#include "net/buffer.h"

#include<iostream>

using namespace std;

using mymuduo::string;
using mymuduo::net::buffer;

/*
簡單測試一下buffer類對與字串和int*型別的讀寫操作把
*/

void test_string()
{
    //初始時,m_readerIndex為8(kCheapPrepend),m_writerIndex為8
    buffer buf;
    assert(buf.readableBytes()==0);
    assert(buf.writableBytes()==buffer::kInitialSize);
    assert(buf.prependableBytes()==buffer::kCheapPrepend);

    //寫200位元組進去,m_readerIndex為8,m_writerIndex為208
    const string str(200,'x');
    buf.append(str);
    assert(buf.readableBytes()==200);
    assert(buf.writableBytes()==1024-200);
    assert(buf.prependableBytes()==8);

    //讀50位元組出來,readIndex:8+50,writeIndex:208
    string str2=buf.retrieveAsString(50);
    assert(str2.size()==50);
    assert(buf.readableBytes()==150);
    assert(buf.writableBytes()==1024-200);
    assert(buf.prependableBytes()==8+50);

    //新增1000位元組進去,肯定超出空間大小,肯定會擴容m_buffer
    buf.append(string(1000,'y'));

    std::cout<<buf.readableBytes()<<" "<<buf.writableBytes()<<" "
            <<buf.prependableBytes()<<std::endl;

}

void test_int()
{
    //初始時,m_readerIndex為8(kCheapPrepend),m_writerIndex為8
    buffer buf;
    assert(buf.readableBytes()==0);
    assert(buf.writableBytes()==buffer::kInitialSize);
    assert(buf.prependableBytes()==buffer::kCheapPrepend);

    //在m_buffer內部int*用網路位元組序儲存
    buf.appendInt64(static_cast<int64_t>(123456));
    buf.appendInt32(static_cast<int32_t>(12345));
    buf.appendInt16(static_cast<int16_t>(1234));
    buf.appendInt8(static_cast<int8_t>(123));

    assert(buf.readableBytes()==8+4+2+1);

    //取出int*返回主機位元組序
    int64_t a=buf.readInt64();
    int64_t b=buf.readInt32();
    int64_t c=buf.readInt16();
    int64_t d=buf.readInt8();

    assert(buf.readableBytes()==0);

    cout<<a<<" "<<b<<" "<<c<<" "<<d<<endl;

}

int main()
{
    test_int();
}

測試結果:

123456 12345 1234 123

例子很簡單,但是buffer內部實現需要注意,int*型別再m_buffer內部儲存方式是網路位元組序(大端法),而讀出的int*是主機位元組序(小端法)

整個buffer圍繞著內部成員m_buffer進行操作,重要的成員就是兩個下標,讀下標和寫下標,只要搞明白了這兩個下標,基本上就是網m_buffer裡面拷貝/刪除資料的問題了。