1. 程式人生 > >Envoy 源碼分析--buffer

Envoy 源碼分析--buffer

lis tin done rri ont 數組 int bubuko ins

目錄

  • Envoy 源碼分析--buffer
    • BufferFragment
    • RawSlice
    • Slice
    • OwnedSlice
    • SliceDeque
    • UnownedSlice
    • OwnedImpl
    • WatermarkBuffer
    • WatermarkBufferFactory
    • ZeroCopyInputStreamImpl

Envoy 源碼分析--buffer

申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。

Envoybuffer 在 1.10.0 前是基於 libevent 的 evbuffer 進行封裝。在 1.10.0 開始為了提高性能,要使用 libev 或 libuv 來替代 libevent 重寫了個 buffer 來消除 evbuffer 的依賴。想要具體了解可看 issue#4952 和 issue#5441。下面我們先來看下 buffer 相關的類圖:

技術分享圖片

上面四個 slice 相關的類是重寫後的 bufferslice 封裝了 buffer 相關操作,OwnedSlice 是創建 slice 的類,SliceDeque

管理 OwnedSlice 操作隊列,UnownedSlice 則是兼容 BufferFragment 的類。

LibEventInstance 繼承自 InstanceInstance 的接口進行擴充新增了兩個接口。OwnedImplLibEventInstance 接口的實現。WatermarkBuffer 則在 OwnedImpl 的基礎上增加水位的高低告警回調。WatermarkBufferFactory 繼承 WatermarkFactory 只是生成 WatermarkBuffer 的適配器。

BufferFragment

針對外部數據和大小創建一個適配器,這個類不拷貝內存,只是一個指針,外部需要自己確定數據是有效的。調用 done()

會釋放數據。

代碼詳情如下:

class BufferFragmentImpl : NonCopyable, public BufferFragment {
public:
  BufferFragmentImpl(
      const void* data, size_t size,
      const std::function<void(const void*, size_t, const BufferFragmentImpl*)>& releasor)
      : data_(data), size_(size), releasor_(releasor) {}

  // Buffer::BufferFragment
  const void* data() const override { return data_; }
  size_t size() const override { return size_; }
  void done() override {
    if (releasor_) {
      releasor_(data_, size_, this);
    }
  }

private:
  const void* const data_;
  const size_t size_;
  const std::function<void(const void*, size_t, const BufferFragmentImpl*)> releasor_;
};

RawSlice

只是一個內存切片的數據結構體。

struct RawSlice {
  void* mem_ = nullptr;
  size_t len_ = 0;

  bool operator==(const RawSlice& rhs) const { return mem_ == rhs.mem_ && len_ == rhs.len_; }
};

Slice

Slice 是一個連續的內存塊的管理。其塊的管理方式如下:

 *                   |<- data_size() -->|<- reservable_size() ->|
 * +-----------------+------------------+-----------------------+
 * | Drained         | Data             | Reservable            |
 * | Unused space    | Usable content   | New content can be    |
 * | that formerly   |                  | added here with       |
 * | was in the Data |                  | reserve()/commit()    |
 * | section         |                  |                       |
 * +-----------------+------------------+-----------------------+
 *                   ^
 *                   |
 *                   data()

前面是個無用內存塊,中間這塊就是數據自身,後面是還可用的內存塊長度。增加數據支持 prependappend 即可追加數據也可以前置增加。

Slice 的數據存儲方式如下:

  /** 切片指針 */
  uint8_t* base_{nullptr};

  /** 存儲數據相對於切片的偏移量 */
  uint64_t data_;

  /** 可用空間相對於切片的偏移量 */
  uint64_t reservable_;

  /** 切片總長度 */
  uint64_t capacity_;

  /** 是否被`Limter`限制以延遲操作 */
  bool reservation_outstanding_{false};

現在來看幾個主要的操作。

append 在後面追加數據。增加盡量多的數量,不保證所有數據都增加成功。這個在空間不夠時,沒有重新申請空間。先判斷數據是否被鎖住了,接著取可用長度和增加長度的最小值,可用空間偏移量後移,copy 數據到塊中。

  uint64_t append(const void* data, uint64_t size) {
    if (reservation_outstanding_) {
      return 0;
    }
    uint64_t copy_size = std::min(size, reservableSize());
    uint8_t* dest = base_ + reservable_;
    reservable_ += copy_size;
    memcpy(dest, data, copy_size);
    return copy_size;
  }

prepend 前置增加數據。同 append 一樣沒有申請空間,只是增加盡量多的數據,不保證所有數據都增加成功。先判斷數據是否被鎖住了,然後判斷切片內是否有數據。如果沒有數據直接增加到最後面,加在最後面的目的是為了後面繼續 prepend 時有空間可加(這裏沒想明白,為啥要在最後面加,這樣我下次用 append 不也一樣沒法增加數據)。後面取出前置可用長度和增加長度的最小值,data 的偏移量後移, copy 數據塊。

  uint64_t prepend(const void* data, uint64_t size) {
    if (reservation_outstanding_) {
      return 0;
    }
    const uint8_t* src = static_cast<const uint8_t*>(data);
    uint64_t copy_size;
    if (dataSize() == 0) {
      copy_size = std::min(size, reservableSize());
      reservable_ = capacity_;
      data_ = capacity_ - copy_size;
    } else {
      if (data_ == 0) {
        return 0;
      }
      copy_size = std::min(size, data_);
      data_ -= copy_size;
    }
    memcpy(base_ + data_, src + size - copy_size, copy_size);
    return copy_size;
  }

drain 刪除數據,這個只是偏移量後移。

  void drain(uint64_t size) {
    ASSERT(data_ + size <= reservable_);
    data_ += size;
    if (data_ == reservable_ && !reservation_outstanding_) {
      data_ = reservable_ = 0;
    }
  }

reservecommit 是合起來一起用的。reserve 返回可用內存塊同時將切片內存塊鎖住,不讓增加和移除,直到 commit 才恢復正常。reserve 返回的內存塊可由用戶自己填充。reserve 時如果增加進來的內存塊不是同個切片下的會增加失敗。

  Reservation reserve(uint64_t size) {
    if (reservation_outstanding_ || size == 0) {
      return {nullptr, 0};
    }
    uint64_t available_size = capacity_ - reservable_;
    if (available_size == 0) {
      return {nullptr, 0};
    }
    uint64_t reservation_size = std::min(size, available_size);
    void* reservation = &(base_[reservable_]);
    reservation_outstanding_ = true;
    return {reservation, reservation_size};
  }

  bool commit(const Reservation& reservation) {
    if (static_cast<const uint8_t*>(reservation.mem_) != base_ + reservable_ ||
        reservable_ + reservation.len_ > capacity_ || reservable_ >= capacity_) {
      // The reservation is not from this OwnedSlice.
      return false;
    }
    ASSERT(reservation_outstanding_);
    reservable_ += reservation.len_;
    reservation_outstanding_ = false;
    return true;
  }

OwnedSlice

只是創建和刪除 Slice 指針。

  static SlicePtr create(uint64_t capacity) {
    uint64_t slice_capacity = sliceSize(capacity);
    return SlicePtr(new (slice_capacity) OwnedSlice(slice_capacity));
  }

  static void operator delete(void* address) { ::operator delete(address); }

SliceDeque

Slice 管理隊列。支持前後置增加和刪除。默認隊列長度 8,當長度不夠用時,會成倍自增長。

這代碼比較簡單。這裏只列出自增長函數。

  void growRing() {
    if (size_ < capacity_) {
      return;
    }
    const size_t new_capacity = capacity_ * 2;
    auto new_ring = std::make_unique<SlicePtr[]>(new_capacity);
    for (size_t i = 0; i < new_capacity; i++) {
      ASSERT(new_ring[i] == nullptr);
    }
    size_t src = start_;
    size_t dst = 0;
    for (size_t i = 0; i < size_; i++) {
      new_ring[dst++] = std::move(ring_[src++]);
      if (src == capacity_) {
        src = 0;
      }
    }
    for (size_t i = 0; i < capacity_; i++) {
      ASSERT(ring_[i].get() == nullptr);
    }
    external_ring_.swap(new_ring);
    ring_ = external_ring_.get();
    start_ = 0;
    capacity_ = new_capacity;
  }

UnownedSlice

管理 BufferFragment 的特殊 Slice

class UnownedSlice : public Slice {
public:
  UnownedSlice(BufferFragment& fragment)
      : Slice(0, fragment.size(), fragment.size()), fragment_(fragment) {
    base_ = static_cast<uint8_t*>(const_cast<void*>(fragment.data()));
  }
  ~UnownedSlice() override { fragment_.done(); }
private:
  BufferFragment& fragment_;
};

OwnedImpl

OwnedImplLibEventInstance 接口的實現,同時為了支持新 buffer 在數據操作時,會有一個 bool 值來判斷調用的是新 buffer 還是舊 buffer。新的 buffer 在 1.10.0 默認情況下是關閉的,可通過配置 –use-libevent-buffers 0 打開。

OwnedImpl::OwnedImpl() : old_impl_(use_old_impl_) {
  if (old_impl_) {
    buffer_ = evbuffer_new();
  }
}

現在我們在分析下幾個重要的操作:

add 增加數據。幾個 add 的操作最終都是調用 add(const void* data, uint64_t size)。

void OwnedImpl::add(const void* data, uint64_t size) {
  //判斷是否為舊的buffer
  if (old_impl_) {
    //直接調用
    evbuffer_add(buffer_.get(), data, size);
  } else {
    const char* src = static_cast<const char*>(data);
    //檢查是否需要新加切片。如果切片為空,需要新加。
    bool new_slice_needed = slices_.empty();
    //循環加入。數據加入切片,一個切片並不一定能把數據全都加進去,所以需要循環加入。
    while (size != 0) {
      if (new_slice_needed) {
        //創建切片並加入切片隊列
        slices_.emplace_back(OwnedSlice::create(size));
      }
      //切片加入數據並返回加入的長度
      uint64_t copy_size = slices_.back()->append(src, size);
      //增加偏移量
      src += copy_size;
      //看是否還有數據未加入。size 大於 0 說明上個切片已滿,需要重新創建切片。
      size -= copy_size;
      length_ += copy_size;
      new_slice_needed = true;
    }
  }
}

prepend 這個和 add 接口類似,需要的自行看源碼。

readIoHandle 中讀取數據,並加入 buffer

Api::IoCallUint64Result OwnedImpl::read(Network::IoHandle& io_handle, uint64_t max_length) {
  //長度判斷,一次讀取最大長度不能為0.
  if (max_length == 0) {
    return Api::ioCallUint64ResultNoError();
  }
  //申明一個RawSlice
  constexpr uint64_t MaxSlices = 2;
  RawSlice slices[MaxSlices];
  //鎖住操作的內存塊
  const uint64_t num_slices = reserve(max_length, slices, MaxSlices);
  //readv讀取數據,數據存在slices
  Api::IoCallUint64Result result = io_handle.readv(max_length, slices, num_slices);
  //舊buffer
  if (old_impl_) {
    if (!result.ok()) {
      //錯誤直接返回
      return result;
    }
    uint64_t num_slices_to_commit = 0;
    //讀取到的長度
    uint64_t bytes_to_commit = result.rc_;
    ASSERT(bytes_to_commit <= max_length);
    //RawSlice檢測,檢查slices有幾個。
    while (bytes_to_commit != 0) {
      //空間判斷,看下一次能加多少數據長度。
      slices[num_slices_to_commit].len_ =
          std::min(slices[num_slices_to_commit].len_, static_cast<size_t>(bytes_to_commit));  
      ASSERT(bytes_to_commit >= slices[num_slices_to_commit].len_);
      bytes_to_commit -= slices[num_slices_to_commit].len_;
      num_slices_to_commit++;
    }
    //不能大於RawSlice本身數組長度,否則直接斷言。
    ASSERT(num_slices_to_commit <= num_slices);
    //提交數據到buffer
    commit(slices, num_slices_to_commit);
  //新buffer
  } else {
    uint64_t bytes_to_commit = result.ok() ? result.rc_ : 0;
    ASSERT(bytes_to_commit <= max_length);
    //新的buffer reserve操作返回的是slice的個數。循環判斷。決定 Slice的長度。
    for (uint64_t i = 0; i < num_slices; i++) {
      slices[i].len_ = std::min(slices[i].len_, static_cast<size_t>(bytes_to_commit));
      bytes_to_commit -= slices[i].len_;
    }
    //提交數據到buffer
    commit(slices, num_slices);
  }
  return result;
}

writebuffer 中讀數據。調用writev。

Api::IoCallUint64Result OwnedImpl::write(Network::IoHandle& io_handle) {
  constexpr uint64_t MaxSlices = 16;
  RawSlice slices[MaxSlices];
  //獲取寫入的數據和大小。
  const uint64_t num_slices = std::min(getRawSlices(slices, MaxSlices), MaxSlices);
  //寫入數據
  Api::IoCallUint64Result result = io_handle.writev(slices, num_slices);
  if (result.ok() && result.rc_ > 0) {
    //寫入成功,將數據從buffer中清除。
    drain(static_cast<uint64_t>(result.rc_));
  }search
  return result;
}

WatermarkBuffer

WatermarkBuffer 是在 OwnedImpl 的基礎上增加了高低水位告警回調,在代碼還是調用了 OwnedImpl

  std::function<void()> below_low_watermark_;
  std::function<void()> above_high_watermark_;

  // Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to
  // setWatermarks() the watermark callbacks will be called as described above.
  uint32_t high_watermark_{0};
  uint32_t low_watermark_{0};

我們來看其中一個函數:

Api::IoCallUint64Result WatermarkBuffer::read(Network::IoHandle& io_handle, uint64_t max_length) {
  //調用OwnedImpl的函數
  Api::IoCallUint64Result result = OwnedImpl::read(io_handle, max_length);
  //高水位檢測
  checkHighWatermark();
  return result;
}

WatermarkBufferFactory

WatermarkBuffer 的適配器,只是用來創建 WatermarkBuffer 的。

ZeroCopyInputStreamImpl

ZeroCopyInputStreamImpl 可以把它認為是 Buffer::InstancePtr 的一個叠代器。

  //數據在buffer中的位移
  uint64_t position_{0};
  //結束標誌
  bool finished_{false};
  //NEXT過的所有字節數。
  uint64_t byte_count_{0};

Envoy 源碼分析--buffer