1. 程式人生 > >【Servicemesh系列】【Envoy原始碼解析(二)】一個Http請求到響應的全鏈路(一)

【Servicemesh系列】【Envoy原始碼解析(二)】一個Http請求到響應的全鏈路(一)

目錄

1. http連線建立

當有新連線過來的時候,會呼叫上一章節所提及的被註冊到libevent裡面的回撥函式。我們回顧一下,上一章節提及了,會有多個worker註冊所有的listener,當有一個連線過來的時候,系統核心會排程一個執行緒出來交付這個連線。這樣,就可以併發地進行連線的快速建立。更詳細的內容可以參考envoy官方部落格關於執行緒模型的描述,此處不贅述。

 listener_.reset(
        evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd
()));

回撥函式將呼叫Listener的OnAccept方法,並最終進行網路級別ConnectionImpl的建立,Connection的底層此處利用了libevent對連線的讀寫事件進行監聽,並註冊了讀寫事件的Filter,用來對監聽到的事件和資料進行處理。

void ConnectionHandlerImpl::ActiveListener::newConnection(Network::ConnectionSocketPtr&& socket) {
  ......
  auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket();
  //
建立ServerConnection Network::ConnectionPtr new_connection = parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket)); new_connection->setBufferLimits(config_.perConnectionBufferLimitBytes()); // 建立真正的Read/Write Filter const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain( *new_connection, filter_chain->networkFilterFactories()); ...... } ConnectionImpl
::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) : transport_socket_(std::move(transport_socket)), filter_manager_(*this, *this), socket_(std::move(socket)), write_buffer_(dispatcher.getWatermarkFactory().create( [this]() -> void { this->onLowWatermark(); }, [this]() -> void { this->onHighWatermark(); })), dispatcher_(dispatcher), id_(next_global_id_++) { // 底層基於event_assign和event_add file_event_ = dispatcher_.createFileEvent( fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); }

至次,http連線建立完成。下面,就等著請求資料過來了。

2. 請求資料獲取

我們都知道,一個connfd會帶有read/write buffer區,當一個請求過來時,常規的互動方式即讓呼叫方依次進行sendrecv操作,來發送並獲取資料。當傳送資料時,通過send將資料最終傳遞給目標fd的read buffer區。此時採用ET觸發的epoll,感知到資料增多/從不可讀變為可讀的狀態,從而觸發EV_READ事件,從而呼叫onFileEvent方法,該方法中,我們目前暫時只關注對read事件的處理:

void ConnectionImpl::onFileEvent(uint32_t events) {

  if (immediate_error_event_ != ConnectionEvent::Connected) {
    ......
  }

  if (events & Event::FileReadyType::Closed) {
    ......    
  }

  if (events & Event::FileReadyType::Write) {
    ......
  }
  // 此處即為對read事件的處理,onReadReady最終會呼叫到onRead方法
  if (fd() != -1 && (events & Event::FileReadyType::Read)) {
    onReadReady();
  }
}

void ConnectionImpl::onReadReady() {
  ......
  // 核心讀取資料的地方。
  IoResult result = transport_socket_->doRead(read_buffer_);
  uint64_t new_buffer_size = read_buffer_.length();

 // 更新連線監控的一些狀態,無實際意義。
 updateReadBufferStats(result.bytes_pActiveListener::newConnectionrocessed_, new_buffer_size);

  read_end_stream_ |= result.end_stream_read_;
  if (result.bytes_processed_ != 0 || result.end_stream_read_) { 
    // 當遠端連線關閉或者有讀取到資料的時候啟動onRead,進行讀取到的資料的處理
    onRead(new_buffer_size);
  }
  ...... 
}

上面可以看到IoResult result = transport_socket_->doRead(read_buffer_);是獲取資料的入口,我們看下資料是如何獲取的。

IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
  PostIoAction action = PostIoAction::KeepOpen;
  uint64_t bytes_read = 0;
  bool end_stream = false;
  do {
    Api::SysCallResult result = buffer.read(callbacks_->fd(), 16384);
    ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), result.rc_);

    if (result.rc_ == 0) {
      end_stream = true;
      break;
    } else if (result.rc_ == -1) {
      ENVOY_CONN_LOG(trace, "read error: {}", callbacks_->connection(), result.errno_);
      if (result.errno_ != EAGAIN) {
        action = PostIoAction::Close;
      }
      break;
    } else {
      bytes_read += result.rc_;
      if (callbacks_->shouldDrainReadBuffer()) {
        callbacks_->setReadBufferReady();
        break;
      }
    }
  } while (true);

  return {action, bytes_read, end_stream};
}

可以看到,強制以16KB的分片大小去迴圈取read buffer區,這個讀資料的動作只有在四個情況下會退出:

  1. 遠端關閉連線,即連線被動關閉。
  2. 發生異常
  3. 讀到buffer區空了
  4. 讀夠limit大小的資料時

此處的的Buffer採用了OwnImpl,底層會進行了readv的系統呼叫,此處不展開。針對以上程式碼,我們著重關注下整個讀邏輯。我們來捋一下:

讀資料的整體邏輯

  1. 監聽conn fd的讀buffer區,設定為epoll邊緣觸發,並且被設定為持久化監聽(libevent設定為EV_PERSIST)。
  2. 當讀buffer區第一次出現數據時候,回撥onFileEvent。開始進行處理
  3. 首先,從buffer區中讀出資料,當被動關閉連線、異常、讀夠1M資料(預設值)、讀到無資料可讀的時候退出這次讀處理。
    1. 如果是異常,則envoy也同步進行各種重置和關閉操作。然後退出。
    2. 如果是被動關閉,不考慮半關閉情況下,envoy會進行重置和關閉操作,但於此同時,會將被動關閉前讀出來的所有資料傳送到後續流程去處理。
    3. 如果是讀夠1M資料的場景,則傳送給後面流程去處理。但需要格外關注的是,由於envoy採用了邊緣觸發,所以如果沒有新資料進來,則無法將監聽到read事件,這樣可能導致資料無法被消費完。為了解決這個問題,所以會通過callbacks_->setReadBufferReady();重新觸發Read事件。
    4. 如果是讀buff區讀完的場景,則將讀取到的資料傳送到後面流程去處理。
  4. 後面流程執行完後(過一系列的filter、包括限流、路由等),一次讀事件處理完成。由於設定了持久化監聽,所以無需手動再進行EV_READ事件註冊。繼續等待下一次讀事件的到來。

(注:會有一些地方會顯示的觸發或者關閉事件監聽,此處不展開討論)

3. 請求資料處理流程拼裝

當從fd中拿到資料後,則會進行正式的處理。處理主要包括限流、熔斷、鏈路追蹤、資料採集、路由轉發、負載均衡等。FilterManager管理所有的Read/Write Filter,並拼裝成pipeline進行處理。

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
  ......
  filter_manager_.onRead();
}

void FilterManagerImpl::onRead() {
  ASSERT(!upstream_filters_.empty());
  onContinueReading(nullptr);
}

// 拼裝邏輯
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      // 第一次訪問則呼叫onNewConnection
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }

    BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      // 後續訪問則呼叫onData
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration) {
        return;
      }
    }
  }
}

這裡的filter是怎麼發現的呢?通過RegsitryFactory來實現的。首先,對應每個factory,比如RatelimiterFilter的factory,需要有個這樣的宣告來實現註冊

static Registry::RegisterFactory<RateLimitFilterConfig,
                                 Server::Configuration::NamedHttpFilterConfigFactory>
    register_;

宣告之後,則會預設呼叫RegisterFactory的建構函式進行註冊

template <class T, class Base> class RegisterFactory {
public:
  RegisterFactory() { FactoryRegistry<Base>::registerFactory(instance_); }

private:
  T instance_{};
};

之後即在初始化Lisnter的階段,會進行對應Filter工廠的例項化,在初始化連線階段,會獲取所需要的工廠例項,進行Filter例項的初始化。

下一章節,我們取出兩個Filter做說明來看整個處理流程。