Envoy 源碼分析--buffer
目錄
- Envoy 源碼分析--buffer
- BufferFragment
- RawSlice
- Slice
- OwnedSlice
- SliceDeque
- UnownedSlice
- OwnedImpl
- WatermarkBuffer
- WatermarkBufferFactory
- ZeroCopyInputStreamImpl
Envoy 源碼分析--buffer
申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。
Envoy
的 buffer
在 1.10.0 前是基於 libevent 的 evbuffer
進行封裝。在 1.10.0 開始為了提高性能,要使用 libev 或 libuv 來替代 libevent 重寫了個 buffer
來消除 evbuffer
的依賴。想要具體了解可看 issue#4952 和 issue#5441。下面我們先來看下 buffer
相關的類圖:
上面四個 slice
相關的類是重寫後的 buffer
。slice
封裝了 buffer 相關操作,OwnedSlice
是創建 slice
的類,SliceDeque
OwnedSlice
操作隊列,UnownedSlice
則是兼容 BufferFragment
的類。
LibEventInstance
繼承自 Instance
對 Instance
的接口進行擴充新增了兩個接口。OwnedImpl
對 LibEventInstance
接口的實現。WatermarkBuffer
則在 OwnedImpl
的基礎上增加水位的高低告警回調。WatermarkBufferFactory
繼承 WatermarkFactory
只是生成 WatermarkBuffer
的適配器。
BufferFragment
針對外部數據和大小創建一個適配器,這個類不拷貝內存,只是一個指針,外部需要自己確定數據是有效的。調用 done()
代碼詳情如下:
class BufferFragmentImpl : NonCopyable, public BufferFragment {
public:
BufferFragmentImpl(
const void* data, size_t size,
const std::function<void(const void*, size_t, const BufferFragmentImpl*)>& releasor)
: data_(data), size_(size), releasor_(releasor) {}
// Buffer::BufferFragment
const void* data() const override { return data_; }
size_t size() const override { return size_; }
void done() override {
if (releasor_) {
releasor_(data_, size_, this);
}
}
private:
const void* const data_;
const size_t size_;
const std::function<void(const void*, size_t, const BufferFragmentImpl*)> releasor_;
};
RawSlice
只是一個內存切片的數據結構體。
struct RawSlice {
void* mem_ = nullptr;
size_t len_ = 0;
bool operator==(const RawSlice& rhs) const { return mem_ == rhs.mem_ && len_ == rhs.len_; }
};
Slice
Slice
是一個連續的內存塊的管理。其塊的管理方式如下:
* |<- data_size() -->|<- reservable_size() ->|
* +-----------------+------------------+-----------------------+
* | Drained | Data | Reservable |
* | Unused space | Usable content | New content can be |
* | that formerly | | added here with |
* | was in the Data | | reserve()/commit() |
* | section | | |
* +-----------------+------------------+-----------------------+
* ^
* |
* data()
前面是個無用內存塊,中間這塊就是數據自身,後面是還可用的內存塊長度。增加數據支持 prepend
和 append
即可追加數據也可以前置增加。
Slice
的數據存儲方式如下:
/** 切片指針 */
uint8_t* base_{nullptr};
/** 存儲數據相對於切片的偏移量 */
uint64_t data_;
/** 可用空間相對於切片的偏移量 */
uint64_t reservable_;
/** 切片總長度 */
uint64_t capacity_;
/** 是否被`Limter`限制以延遲操作 */
bool reservation_outstanding_{false};
現在來看幾個主要的操作。
append
在後面追加數據。增加盡量多的數量,不保證所有數據都增加成功。這個在空間不夠時,沒有重新申請空間。先判斷數據是否被鎖住了,接著取可用長度和增加長度的最小值,可用空間偏移量後移,copy 數據到塊中。
uint64_t append(const void* data, uint64_t size) {
if (reservation_outstanding_) {
return 0;
}
uint64_t copy_size = std::min(size, reservableSize());
uint8_t* dest = base_ + reservable_;
reservable_ += copy_size;
memcpy(dest, data, copy_size);
return copy_size;
}
prepend
前置增加數據。同 append
一樣沒有申請空間,只是增加盡量多的數據,不保證所有數據都增加成功。先判斷數據是否被鎖住了,然後判斷切片內是否有數據。如果沒有數據直接增加到最後面,加在最後面的目的是為了後面繼續 prepend
時有空間可加(這裏沒想明白,為啥要在最後面加,這樣我下次用 append
不也一樣沒法增加數據)。後面取出前置可用長度和增加長度的最小值,data
的偏移量後移, copy 數據塊。
uint64_t prepend(const void* data, uint64_t size) {
if (reservation_outstanding_) {
return 0;
}
const uint8_t* src = static_cast<const uint8_t*>(data);
uint64_t copy_size;
if (dataSize() == 0) {
copy_size = std::min(size, reservableSize());
reservable_ = capacity_;
data_ = capacity_ - copy_size;
} else {
if (data_ == 0) {
return 0;
}
copy_size = std::min(size, data_);
data_ -= copy_size;
}
memcpy(base_ + data_, src + size - copy_size, copy_size);
return copy_size;
}
drain
刪除數據,這個只是偏移量後移。
void drain(uint64_t size) {
ASSERT(data_ + size <= reservable_);
data_ += size;
if (data_ == reservable_ && !reservation_outstanding_) {
data_ = reservable_ = 0;
}
}
reserve
和 commit
是合起來一起用的。reserve
返回可用內存塊同時將切片內存塊鎖住,不讓增加和移除,直到 commit
才恢復正常。reserve
返回的內存塊可由用戶自己填充。reserve
時如果增加進來的內存塊不是同個切片下的會增加失敗。
Reservation reserve(uint64_t size) {
if (reservation_outstanding_ || size == 0) {
return {nullptr, 0};
}
uint64_t available_size = capacity_ - reservable_;
if (available_size == 0) {
return {nullptr, 0};
}
uint64_t reservation_size = std::min(size, available_size);
void* reservation = &(base_[reservable_]);
reservation_outstanding_ = true;
return {reservation, reservation_size};
}
bool commit(const Reservation& reservation) {
if (static_cast<const uint8_t*>(reservation.mem_) != base_ + reservable_ ||
reservable_ + reservation.len_ > capacity_ || reservable_ >= capacity_) {
// The reservation is not from this OwnedSlice.
return false;
}
ASSERT(reservation_outstanding_);
reservable_ += reservation.len_;
reservation_outstanding_ = false;
return true;
}
OwnedSlice
只是創建和刪除 Slice
指針。
static SlicePtr create(uint64_t capacity) {
uint64_t slice_capacity = sliceSize(capacity);
return SlicePtr(new (slice_capacity) OwnedSlice(slice_capacity));
}
static void operator delete(void* address) { ::operator delete(address); }
SliceDeque
Slice
管理隊列。支持前後置增加和刪除。默認隊列長度 8,當長度不夠用時,會成倍自增長。
這代碼比較簡單。這裏只列出自增長函數。
void growRing() {
if (size_ < capacity_) {
return;
}
const size_t new_capacity = capacity_ * 2;
auto new_ring = std::make_unique<SlicePtr[]>(new_capacity);
for (size_t i = 0; i < new_capacity; i++) {
ASSERT(new_ring[i] == nullptr);
}
size_t src = start_;
size_t dst = 0;
for (size_t i = 0; i < size_; i++) {
new_ring[dst++] = std::move(ring_[src++]);
if (src == capacity_) {
src = 0;
}
}
for (size_t i = 0; i < capacity_; i++) {
ASSERT(ring_[i].get() == nullptr);
}
external_ring_.swap(new_ring);
ring_ = external_ring_.get();
start_ = 0;
capacity_ = new_capacity;
}
UnownedSlice
管理 BufferFragment
的特殊 Slice
。
class UnownedSlice : public Slice {
public:
UnownedSlice(BufferFragment& fragment)
: Slice(0, fragment.size(), fragment.size()), fragment_(fragment) {
base_ = static_cast<uint8_t*>(const_cast<void*>(fragment.data()));
}
~UnownedSlice() override { fragment_.done(); }
private:
BufferFragment& fragment_;
};
OwnedImpl
OwnedImpl
是 LibEventInstance
接口的實現,同時為了支持新 buffer
在數據操作時,會有一個 bool 值來判斷調用的是新 buffer
還是舊 buffer
。新的 buffer
在 1.10.0 默認情況下是關閉的,可通過配置 –use-libevent-buffers 0
打開。
OwnedImpl::OwnedImpl() : old_impl_(use_old_impl_) {
if (old_impl_) {
buffer_ = evbuffer_new();
}
}
現在我們在分析下幾個重要的操作:
add
增加數據。幾個 add
的操作最終都是調用 add(const void* data, uint64_t size)。
void OwnedImpl::add(const void* data, uint64_t size) {
//判斷是否為舊的buffer
if (old_impl_) {
//直接調用
evbuffer_add(buffer_.get(), data, size);
} else {
const char* src = static_cast<const char*>(data);
//檢查是否需要新加切片。如果切片為空,需要新加。
bool new_slice_needed = slices_.empty();
//循環加入。數據加入切片,一個切片並不一定能把數據全都加進去,所以需要循環加入。
while (size != 0) {
if (new_slice_needed) {
//創建切片並加入切片隊列
slices_.emplace_back(OwnedSlice::create(size));
}
//切片加入數據並返回加入的長度
uint64_t copy_size = slices_.back()->append(src, size);
//增加偏移量
src += copy_size;
//看是否還有數據未加入。size 大於 0 說明上個切片已滿,需要重新創建切片。
size -= copy_size;
length_ += copy_size;
new_slice_needed = true;
}
}
}
prepend
這個和 add
接口類似,需要的自行看源碼。
read
從 IoHandle
中讀取數據,並加入 buffer
。
Api::IoCallUint64Result OwnedImpl::read(Network::IoHandle& io_handle, uint64_t max_length) {
//長度判斷,一次讀取最大長度不能為0.
if (max_length == 0) {
return Api::ioCallUint64ResultNoError();
}
//申明一個RawSlice
constexpr uint64_t MaxSlices = 2;
RawSlice slices[MaxSlices];
//鎖住操作的內存塊
const uint64_t num_slices = reserve(max_length, slices, MaxSlices);
//readv讀取數據,數據存在slices
Api::IoCallUint64Result result = io_handle.readv(max_length, slices, num_slices);
//舊buffer
if (old_impl_) {
if (!result.ok()) {
//錯誤直接返回
return result;
}
uint64_t num_slices_to_commit = 0;
//讀取到的長度
uint64_t bytes_to_commit = result.rc_;
ASSERT(bytes_to_commit <= max_length);
//RawSlice檢測,檢查slices有幾個。
while (bytes_to_commit != 0) {
//空間判斷,看下一次能加多少數據長度。
slices[num_slices_to_commit].len_ =
std::min(slices[num_slices_to_commit].len_, static_cast<size_t>(bytes_to_commit));
ASSERT(bytes_to_commit >= slices[num_slices_to_commit].len_);
bytes_to_commit -= slices[num_slices_to_commit].len_;
num_slices_to_commit++;
}
//不能大於RawSlice本身數組長度,否則直接斷言。
ASSERT(num_slices_to_commit <= num_slices);
//提交數據到buffer
commit(slices, num_slices_to_commit);
//新buffer
} else {
uint64_t bytes_to_commit = result.ok() ? result.rc_ : 0;
ASSERT(bytes_to_commit <= max_length);
//新的buffer reserve操作返回的是slice的個數。循環判斷。決定 Slice的長度。
for (uint64_t i = 0; i < num_slices; i++) {
slices[i].len_ = std::min(slices[i].len_, static_cast<size_t>(bytes_to_commit));
bytes_to_commit -= slices[i].len_;
}
//提交數據到buffer
commit(slices, num_slices);
}
return result;
}
write
從 buffer
中讀數據。調用writev。
Api::IoCallUint64Result OwnedImpl::write(Network::IoHandle& io_handle) {
constexpr uint64_t MaxSlices = 16;
RawSlice slices[MaxSlices];
//獲取寫入的數據和大小。
const uint64_t num_slices = std::min(getRawSlices(slices, MaxSlices), MaxSlices);
//寫入數據
Api::IoCallUint64Result result = io_handle.writev(slices, num_slices);
if (result.ok() && result.rc_ > 0) {
//寫入成功,將數據從buffer中清除。
drain(static_cast<uint64_t>(result.rc_));
}search
return result;
}
WatermarkBuffer
WatermarkBuffer
是在 OwnedImpl
的基礎上增加了高低水位告警回調,在代碼還是調用了 OwnedImpl
。
std::function<void()> below_low_watermark_;
std::function<void()> above_high_watermark_;
// Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to
// setWatermarks() the watermark callbacks will be called as described above.
uint32_t high_watermark_{0};
uint32_t low_watermark_{0};
我們來看其中一個函數:
Api::IoCallUint64Result WatermarkBuffer::read(Network::IoHandle& io_handle, uint64_t max_length) {
//調用OwnedImpl的函數
Api::IoCallUint64Result result = OwnedImpl::read(io_handle, max_length);
//高水位檢測
checkHighWatermark();
return result;
}
WatermarkBufferFactory
WatermarkBuffer
的適配器,只是用來創建 WatermarkBuffer
的。
ZeroCopyInputStreamImpl
ZeroCopyInputStreamImpl
可以把它認為是 Buffer::InstancePtr
的一個叠代器。
//數據在buffer中的位移
uint64_t position_{0};
//結束標誌
bool finished_{false};
//NEXT過的所有字節數。
uint64_t byte_count_{0};
Envoy 源碼分析--buffer