【Servicemesh系列】【Envoy原始碼解析(二)】一個Http請求到響應的全鏈路(一)
目錄
1. http連線建立
當有新連線過來的時候,會呼叫上一章節所提及的被註冊到libevent裡面的回撥函式。我們回顧一下,上一章節提及了,會有多個worker註冊所有的listener,當有一個連線過來的時候,系統核心會排程一個執行緒出來交付這個連線。這樣,就可以併發地進行連線的快速建立。更詳細的內容可以參考envoy官方部落格關於執行緒模型的描述,此處不贅述。
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd ()));
回撥函式將呼叫Listener的OnAccept方法,並最終進行網路級別ConnectionImpl的建立,Connection的底層此處利用了libevent對連線的讀寫事件進行監聽,並註冊了讀寫事件的Filter,用來對監聽到的事件和資料進行處理。
void ConnectionHandlerImpl::ActiveListener::newConnection(Network::ConnectionSocketPtr&& socket) {
......
auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket();
// 建立ServerConnection
Network::ConnectionPtr new_connection =
parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket));
new_connection->setBufferLimits(config_.perConnectionBufferLimitBytes());
// 建立真正的Read/Write Filter
const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
*new_connection, filter_chain->networkFilterFactories());
......
}
ConnectionImpl ::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
TransportSocketPtr&& transport_socket, bool connected)
: transport_socket_(std::move(transport_socket)), filter_manager_(*this, *this),
socket_(std::move(socket)), write_buffer_(dispatcher.getWatermarkFactory().create(
[this]() -> void { this->onLowWatermark(); },
[this]() -> void { this->onHighWatermark(); })),
dispatcher_(dispatcher), id_(next_global_id_++) {
// 底層基於event_assign和event_add
file_event_ = dispatcher_.createFileEvent(
fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Write);
}
至次,http連線建立完成。下面,就等著請求資料過來了。
2. 請求資料獲取
我們都知道,一個connfd會帶有read/write buffer區,當一個請求過來時,常規的互動方式即讓呼叫方依次進行send
和recv
操作,來發送並獲取資料。當傳送資料時,通過send
將資料最終傳遞給目標fd的read buffer區。此時採用ET觸發的epoll,感知到資料增多/從不可讀變為可讀的狀態,從而觸發EV_READ
事件,從而呼叫onFileEvent
方法,該方法中,我們目前暫時只關注對read事件的處理:
void ConnectionImpl::onFileEvent(uint32_t events) {
if (immediate_error_event_ != ConnectionEvent::Connected) {
......
}
if (events & Event::FileReadyType::Closed) {
......
}
if (events & Event::FileReadyType::Write) {
......
}
// 此處即為對read事件的處理,onReadReady最終會呼叫到onRead方法
if (fd() != -1 && (events & Event::FileReadyType::Read)) {
onReadReady();
}
}
void ConnectionImpl::onReadReady() {
......
// 核心讀取資料的地方。
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
// 更新連線監控的一些狀態,無實際意義。
updateReadBufferStats(result.bytes_pActiveListener::newConnectionrocessed_, new_buffer_size);
read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_) {
// 當遠端連線關閉或者有讀取到資料的時候啟動onRead,進行讀取到的資料的處理
onRead(new_buffer_size);
}
......
}
上面可以看到IoResult result = transport_socket_->doRead(read_buffer_);
是獲取資料的入口,我們看下資料是如何獲取的。
IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
PostIoAction action = PostIoAction::KeepOpen;
uint64_t bytes_read = 0;
bool end_stream = false;
do {
Api::SysCallResult result = buffer.read(callbacks_->fd(), 16384);
ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), result.rc_);
if (result.rc_ == 0) {
end_stream = true;
break;
} else if (result.rc_ == -1) {
ENVOY_CONN_LOG(trace, "read error: {}", callbacks_->connection(), result.errno_);
if (result.errno_ != EAGAIN) {
action = PostIoAction::Close;
}
break;
} else {
bytes_read += result.rc_;
if (callbacks_->shouldDrainReadBuffer()) {
callbacks_->setReadBufferReady();
break;
}
}
} while (true);
return {action, bytes_read, end_stream};
}
可以看到,強制以16KB的分片大小去迴圈取read buffer區,這個讀資料的動作只有在四個情況下會退出:
- 遠端關閉連線,即連線被動關閉。
- 發生異常
- 讀到buffer區空了
- 讀夠limit大小的資料時
此處的的Buffer採用了OwnImpl
,底層會進行了readv的系統呼叫,此處不展開。針對以上程式碼,我們著重關注下整個讀邏輯。我們來捋一下:
讀資料的整體邏輯
- 監聽conn fd的讀buffer區,設定為epoll邊緣觸發,並且被設定為持久化監聽(libevent設定為EV_PERSIST)。
- 當讀buffer區第一次出現數據時候,回撥
onFileEvent
。開始進行處理 - 首先,從buffer區中讀出資料,當被動關閉連線、異常、讀夠1M資料(預設值)、讀到無資料可讀的時候退出這次讀處理。
- 如果是異常,則envoy也同步進行各種重置和關閉操作。然後退出。
- 如果是被動關閉,不考慮半關閉情況下,envoy會進行重置和關閉操作,但於此同時,會將被動關閉前讀出來的所有資料傳送到後續流程去處理。
- 如果是讀夠1M資料的場景,則傳送給後面流程去處理。但需要格外關注的是,由於envoy採用了邊緣觸發,所以如果沒有新資料進來,則無法將監聽到read事件,這樣可能導致資料無法被消費完。為了解決這個問題,所以會通過
callbacks_->setReadBufferReady();
重新觸發Read事件。 - 如果是讀buff區讀完的場景,則將讀取到的資料傳送到後面流程去處理。
- 後面流程執行完後(過一系列的filter、包括限流、路由等),一次讀事件處理完成。由於設定了持久化監聽,所以無需手動再進行EV_READ事件註冊。繼續等待下一次讀事件的到來。
(注:會有一些地方會顯示的觸發或者關閉事件監聽,此處不展開討論)
3. 請求資料處理流程拼裝
當從fd中拿到資料後,則會進行正式的處理。處理主要包括限流、熔斷、鏈路追蹤、資料採集、路由轉發、負載均衡等。FilterManager管理所有的Read/Write Filter,並拼裝成pipeline進行處理。
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
......
filter_manager_.onRead();
}
void FilterManagerImpl::onRead() {
ASSERT(!upstream_filters_.empty());
onContinueReading(nullptr);
}
// 拼裝邏輯
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
entry = upstream_filters_.begin();
} else {
entry = std::next(filter->entry());
}
for (; entry != upstream_filters_.end(); entry++) {
if (!(*entry)->initialized_) {
(*entry)->initialized_ = true;
// 第一次訪問則呼叫onNewConnection
FilterStatus status = (*entry)->filter_->onNewConnection();
if (status == FilterStatus::StopIteration) {
return;
}
}
BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
// 後續訪問則呼叫onData
FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
if (status == FilterStatus::StopIteration) {
return;
}
}
}
}
這裡的filter是怎麼發現的呢?通過RegsitryFactory
來實現的。首先,對應每個factory,比如RatelimiterFilter的factory,需要有個這樣的宣告來實現註冊
static Registry::RegisterFactory<RateLimitFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
宣告之後,則會預設呼叫RegisterFactory的建構函式進行註冊
template <class T, class Base> class RegisterFactory {
public:
RegisterFactory() { FactoryRegistry<Base>::registerFactory(instance_); }
private:
T instance_{};
};
之後即在初始化Lisnter的階段,會進行對應Filter工廠的例項化,在初始化連線階段,會獲取所需要的工廠例項,進行Filter例項的初始化。
下一章節,我們取出兩個Filter做說明來看整個處理流程。