Nginx upstream (一) 整體流程分析
Nginx訪問上游伺服器的流程大致分以下幾個階段:啟動upstream、連線上游伺服器、向上遊傳送請求、接收上游響應(包頭/包體)、結束請求。本篇主要從程式碼流程的角度,梳理一下upstream的整個的資料的處理流程。下面先看一下upstream相關的兩個重要資料結構ngx_http_upstream_t和ngx_http_upstream_conf_t:
相關資料結構
typedef struct ngx_http_upstream_s ngx_http_upstream_t; struct ngx_http_upstream_s { ngx_http_upstream_handler_pt read_event_handler; // 處理讀事件的回撥方法 ngx_http_upstream_handler_pt write_event_handler; // 處理寫事件的回撥方法 ngx_peer_connection_t peer; // 主動向上游發起的連線,稍後會詳細分析 ngx_event_pipe_t *pipe; // 當開啟快取配置,會用pipe來轉發響應,需要http模組在使用upstream機制前構造pipe結構體 ngx_chain_t *request_bufs; // 用連結串列將ngx_buf_t緩衝區連結起來,表示所有需要傳送到上游的請求內容, // create_request回撥在於構造request_buf連結串列 ngx_output_chain_ctx_t output; // 向下遊傳送響應的方式,稍後會詳細分析 ngx_chain_writer_ctx_t writer; // 向下遊傳送響應的方式,稍後會詳細分析 ngx_http_upstream_conf_t *conf; // upstream相關的配置資訊 #if (NGX_HTTP_CACHE) ngx_array_t *caches; // 快取陣列,稍後會單獨介紹快取相關內容 #endif ngx_http_upstream_headers_in_t headers_in; // 當直接轉發時,process_header將解析的頭部適配為http頭部,同時將包頭資訊放在headers_in中 ngx_http_upstream_resolved_t *resolved; // 用於解析主機域名,後面會詳細介紹 ngx_buf_t from_client; // ToDo.... ngx_buf_t buffer; // 接收上游伺服器響應包頭的快取區,當不需要直接響應或buffering為0時,也作為轉發包體緩衝區 off_t length; // 來自上游伺服器的響應包體的長度 ngx_chain_t *out_bufs; // 使用時再具體介紹,不同場景下有不同意義 ngx_chain_t *busy_bufs; // 當buffering為0時,表示上一次向下遊轉發響應時沒有傳送完成的內容 ngx_chain_t *free_bufs; // 當buffering為0時,用於回收out_bufs中已經發送給下游的ngx_buf_t結構體 ngx_int_t (*input_filter_init)(void *data); // 處理包體前的初始化方法,其中data用於傳遞使用者資料結構,即下方的input_filter_ctx ngx_int_t (*input_filter)(void *data, ssize_t bytes)// 處理包體的方法,bytes表示本次接收到的包體長度,data同上 void *input_filter_ctx; // 傳遞http模組的自定義的資料結構 #if (NGX_HTTP_CACHE) ngx_int_t (*create_key)(ngx_http_request_t *r); // cache部分,後面再分析 #endif ngx_int_t (*create_request)(ngx_http_request_t *r); // 用於構造發往上游伺服器的請求 ngx_int_t (*reinit_request)(ngx_http_request_t *r); // 與上游通訊失敗,需要重新發起連線時,用該方法重新初始化請求資訊 ngx_int_t (*process_header)(ngx_http_request_t *r); // 解析上游伺服器返回響應的包頭,NGX_AGAIN接收不完整,NGX_OK解析到完整包頭 void (*abort_request)(ngx_http_request_t *r); // 暫時沒有用到 void (*finalize_request)(ngx_http_request_t *r, // 請求結束時會呼叫,目前沒有實際作用 ngx_int_t rc); ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r,// 上游返回響應中含Location或Refresh時,process_header會呼叫http模組實現的該方法 ngx_table_elt_t *h, size_t prefix); ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r, // 同上,當響應中含Set-Cookie時,會呼叫http模組實現的該方法 ngx_table_elt_t *h); ngx_msec_t timeout; // 暫時沒有用到 ngx_http_upstream_state_t *state; // 用於表示上游響應的錯誤碼、包體長度等資訊 ngx_str_t method; // 用於檔案快取,稍後再進行分析 ngx_str_t schema; // 記錄日誌時使用 ngx_str_t uri; // 記錄日誌時使用 ngx_http_cleanup_pt *cleanup; // 用於標識是否需要清理資源,相當於一個標誌位,實際不會呼叫該方法 unsigned store:1; // 是否指定檔案快取路徑的標誌位 unsigned cacheable:1; // 是否啟用檔案快取 unsigned accel:1; // 目前沒有用到 unsigned ssl:1; // 是否基於SSL協議訪問上游伺服器 unsigned buffering:1; // 向下遊轉發響應包體時,是否開啟更大記憶體及臨時磁碟檔案用於快取來不及傳送到下游的響應包體 unsigned keepalive:1; // 標識與後端是否開啟keepalive ? unsigned upgrade:1; // 是否存在upgrade header unsigned request_sent:1; // 是否向上遊伺服器傳送了請求 unsigned header_sent:1; // 為1時,表示包頭已經轉發給客戶端了 }
ngx_http_upstream_conf_t:指定了upstream的執行方式,必須在啟動upstream之前設定
點選(此處)摺疊或開啟
typedef struct { ngx_http_upstream_srv_conf_t *upstream; // 當上面沒有實現resolved成員時,用該結構體定義上游伺服器的配置 ngx_msec_t connect_timeout; // 建立tcp連線的超時時間,即寫事件新增到定時器中設定的超時時間 ngx_msec_t send_timeout; // 傳送請求的超時時間,即寫事件新增到定時器中設定的超時時間 ngx_msec_t read_timeout; // 接收響應的超時時間,即讀事件新增到定時器中設定的超時時間 ngx_msec_t timeout; // 暫時沒有使用 ngx_msec_t next_upstream_timeout; // size_t send_lowat; // 傳送快取區的下限,即TCP的SO_SNOLOWAT選項 size_t buffer_size; // 指定接收頭部緩衝區分配的記憶體大小,當buffering為0時,由於上述buffer同時用於接收包體,也表示接收包體緩衝區大小 size_t limit_rate; // size_t busy_buffers_size; // 當buffering為1,且向下遊轉發響應時生效,會設定到ngx_event_pipe_t結構體的busy_size中 size_t max_temp_file_size; // 指定臨時檔案的大小,限制ngx_event_pipe_t中的temp_file size_t temp_file_write_size; // 將緩衝區的響應寫入臨時檔案時,一次寫入字元流的最大長度 ...... ngx_bufs_t bufs; // 以快取響應的方式轉發上游伺服器的包體時所使用的記憶體大小 ngx_uint_t ignore_headers; // 以點陣圖的形式標識在轉發時需要忽略的headers ngx_uint_t next_upstream; // 以點陣圖的方式表示一些錯誤碼,當處理上游響應時發現該錯誤碼,選擇下一個上游伺服器重發請求 ngx_uint_t store_access; // 表示建立的臨時目錄和檔案的許可權 ngx_uint_t next_upstream_tries; // ngx_flag_t buffering; // 為1時表示開啟快取,儘量在記憶體和磁碟中快取來自上游的響應,為0時則開闢固定大小記憶體塊作為快取來轉發響應 ...... ngx_flag_t ignore_client_abort; // 為1時,表示與上游伺服器互動時不檢查nginx與下游伺服器是否斷開,即使下游主動關閉連線,也不會中斷與上游互動 ngx_flag_t intercept_errors; // 詳見ngx_http_upstream_intercept_errors ngx_flag_t cyclic_temp_file; // 為1時,會嘗試複用臨時檔案中已經使用過的空間 ...... ngx_path_t *temp_path; // buffering為1的情況下轉發響應時,存放臨時檔案的路徑 ngx_hash_t hide_headers_hash; // 不轉發的頭部,根據hide_headers和pass_headers動態陣列構造出的需要隱藏的http頭部散列表 ngx_array_t *hide_headers; // 當轉發上游頭部給下游時,如果不希望將某些頭部轉發給下游,則設定到該陣列中 ngx_array_t *pass_headers; // 轉發頭部時upstream機制預設不會轉發某些頭部,當確定需要轉發時,需要設定到該陣列中 ngx_http_upstream_local_t *local; // 連線上游伺服器時,需要使用的本機地址 ngx_array_t *store_lengths; // 當需要將上游響應快取到檔案中時,表示存放路徑的長度 ngx_array_t *store_values; // 當需要將上游響應快取到檔案中時,表示存放路徑 ...... signed store:2; // 同ngx_http_upstream_t中的store unsigned intercept_404:1; // 如果該值設為1,當上遊返回404時直接轉發該錯誤碼給下游,而不會去與error_page進行比較 unsigned change_buffering:1; // 當為1時,根據上游伺服器返回的響應頭部,動態決定是以上游網速優先,還是下游網速優先 ...... ngx_str_t module; // 使用upstream的模組名稱,僅用於記錄日誌 } ngx_http_upstream_conf_t
啟動upstream
當收到請求後,http的代理模組是ngx_http_proxy_module,其NGX_HTTP_CONTENT_PHASE階段的處理函式為ngx_http_proxy_handler
點選(此處)摺疊或開啟
static ngx_int_t ngx_http_proxy_handler(ngx_http_request_t *r) { // 建立ngx_http_upstream_t結構,並賦值給r->upstream if (ngx_http_upstream_create(r) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } ..... plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module); ..... u = r->upstream; ..... // 給upstream的conf成員賦值,記錄相關的配置資訊 u->conf = &plcf->upstream; // 設定相關的回撥資訊 u->create_request = ngx_http_proxy_create_request; u->reinit_request = ngx_http_proxy_reinit_request; u->process_header = ngx_http_proxy_process_status_line; u->abort_request = ngx_http_proxy_abort_request; u->finalize_request = ngx_http_proxy_finalize_request; ...... u->buffering = plcf->upstream.buffering; ..... // 呼叫ngx_http_upstream_init函式 rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); ..... return NGX_DONE; }
首先建立upstream的結構並進行設定,然後設定ngx_http_upstream_conf_t配置結構體給upstream->conf。ngx_http_upstream_init函式會根據
ngx_http_upstream_conf_t配置的資訊初始化upstream,同時開始連線上游伺服器,由此展開整個upstream的處理流程。
點選(此處)摺疊或開啟
void ngx_http_upstream_init(ngx_http_request_t *r)
{
ngx_connection_t *c;
// 客戶端的連線
c = r->connection;
......
// 當啟用upstream時,需要將客戶端對應的讀事件從定時器中刪除,此時主要關注上游的連線相關的事件
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
......
ngx_http_upstream_init_request(r);
}
繼續看ngx_http_upstream_init_request函式
點選(此處)摺疊或開啟
static void ngx_http_upstream_init_request(ngx_http_request_t *r)
{
u = r->upstream;
u->store = u->conf->store;
......
// 設定Nginx與下游客戶端之間TCP連線的檢查方法,注意幾個條件,ignore來自之前配置屬性,是否忽略客戶端的連線狀態
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
......
// 呼叫http模組實現的create_request方法,即前面註冊的ngx_http_proxy_create_request函式,用於構造發到上游伺服器的請求
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
......
// 向當前請求的main成員指向的原始請求中的cleanup連結串列末尾新增一個新成員
cln = ngx_http_cleanup_add(r, 0);
// 將handler的回撥方法設定為ngx_http_upstream_cleanup
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
......
// 呼叫ngx_http_upstream_connect向上遊伺服器發起連線
ngx_http_upstream_connect(r, u);
}
與上游伺服器建立連線
upstream機制與上游伺服器之間通過tcp建立連線,為了保證三次握手的過程中不阻塞程序,Nginx採用了無阻塞的套接字來連線上游伺服器。
ngx_http_upstream_connect負責發起建連動作,如果沒有立即返回成功,需要在epoll中監控該套接字,當出現可寫事件時,則說明連線已經建立成功。
點選(此處)摺疊或開啟
static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
// 建連的動作主要由下面函式進行.....
rc = ngx_event_connect_peer(&u->peer);
....
點選(此處)摺疊或開啟
ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
// 建立tcp socket套接字
s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0);
......
// 獲取空閒的ngx_connection_t結構來承載連線,從ngx_cycle_t的free_connections指向的空閒連線池中獲取
c = ngx_get_connection(s, pc->log);
......
// 設定連線為非阻塞的模式
if (ngx_nonblocking(s) == -1) {
......
// 繫結地址和埠
if (pc->local) {
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
......
// 設定連線收發相關的回撥函式
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
// 啟用sendfile的支援
c->sendfile = 1;
......
rev = c->read;
wev = c->write;
......
pc->connection = c;
// 呼叫ngx_event_actions.add_conn將tcp套接字以期待可讀、可寫的方式新增到事件蒐集器中,這裡是把套接字加到epoll中
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
// 向上遊伺服器發起連線,由於非阻塞,呼叫會立即返回
rc = connect(s, pc->sockaddr, pc->socklen);
......
回到ngx_http_upstream_connect繼續分析
點選(此處)摺疊或開啟
static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// 上面已經分析了,該函式主要進行上游伺服器的連線
rc = ngx_event_connect_peer(&u->peer);
......
c = u->peer.connection;
c->data = r;
// 將上游connection上讀寫事件的回撥,都設定為ngx_http_upstream_handler
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
// 設定upstream機制的write_event_handler和read_event_handler,具體使用見後續的ngx_upstream_handler函式
// ngx_http_upstream_send_request_handler用於向上遊傳送請求
u->write_event_handler = ngx_http_upstream_send_request_handler;
// ngx_http_upstream_process_header接收和解析上游伺服器的響應
u->read_event_handler = ngx_http_upstream_process_header;
......
if (rc == NGX_AGAIN) {
// 當連線沒有建立成功時,套接字已經在epoll中了,將寫事件新增到定時器中,超時時間是ngx_http_upstream_conf_t中的connect_timeout成員
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
......
// 當成功建立連線時,向上遊伺服器傳送請求,注意:此處的函式與上面設定的定時器回撥的函式有所不同,下文會進行說明
ngx_http_upstream_send_request(r, u);
}
下面先簡單看一下connection的讀寫回調函式——ngx_http_upstream_handler
點選(此處)摺疊或開啟
static void
ngx_http_upstream_handler(ngx_event_t *ev)
{
......
// 由事件的data成員取得ngx_connection_t連線,該連線是nginx與上游伺服器之間的連線
c = ev->data;
// 由連線的data取得ngx_http_request_t結構體
r = c->data;
// 由請求的upstream成員取的表示upstream機制的ngx_http_upstream_t結構體
u = r->upstream;
// 此處ngx_http_request_t結構中的connection成員代表的是客戶端與nginx之間連線
c = r->connection;
......
if (ev->write) {
// nginx與上游伺服器間的tcp連線的可寫事件被觸發時,該方法被呼叫
u->write_event_handler(r, u);
} else {
// nginx與上游伺服器間的tcp連線的可讀事件被觸發時,該方法被呼叫
u->read_event_handler(r, u);
}
// 與nginx_http_request_handler相同,最後一步執行post請求
ngx_http_run_posted_requests(c);
}
傳送請求到上游伺服器
前面在介紹ngx_http_upstream_connect函式時,我們看到將ngx_http_upstream_t中的write_event_handler設定為了ngx_http_upstream_send_request_handler,而ngx_http_upstream_connect的最後直接呼叫了ngx_http_upstream_send_request傳送請求。
下面先來看一下兩者的區別
點選(此處)摺疊或開啟
static void ngx_http_upstream_send_request_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_connection_t *c;
// 獲取與上游伺服器間表示連線的ngx_connection_t結構體
c = u->peer.connection;
// 當寫事件的timeout被設定為1時,則代表向上遊傳送http請求已經超時
if (c->write->timedout) {
// 將超時錯誤傳給next方法,next方法根據允許的重傳策略決定:重新發起連線執行upstream請求,還是結束upstream請求
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
......
// header_sent為1時,表示上游伺服器的響應需要直接轉發給客戶端,而且此時響應包頭已經轉給客戶端了
if (u->header_sent) {
// 由於此時已經收到了上游伺服器的完整包頭,此時不需要再向上游傳送請求,因此將write回撥設定為空函式(只記錄日誌)
u->write_event_handler = ngx_http_upstream_dummy_handler;
// 將寫事件新增到epoll中
(void) ngx_handle_write_event(c->write, 0);
return;
}
// 呼叫下面函式向上遊傳送http請求
ngx_http_upstream_send_request(r, u);
}
通過上面的分析,現在很容易看出兩者的區別,ngx_http_upstream_send_request_handler更多的是在檢測請求的狀態,而實際的傳送函式是
ngx_http_upstream_send_request,下面繼續看一下該函式。
點選(此處)摺疊或開啟
static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// 傳送u->request_bufs連結串列上的請求內容,該函式會把未一次傳送完的連結串列緩衝區儲存下來,再次呼叫時不需要request_bufs引數
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
// 標識已經向上遊傳送了請求,實際上是為了標識是否呼叫過ngx_output_chain,除了第一次,其他時候不需要再傳送request_bufs,直接設定為NULL
u->request_sent = 1;
......
// 當寫事件仍在定時器中時,先將寫事件從定時器中移出,由ngx_output_chain的返回值決定是否需要向定時器中增加寫事件
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
// 當ngx_output_chain返回NGX_AGAIN時,說明請求還沒有發完,此時需要設定寫事件定時器
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
// 將寫事件新增到epoll中
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 結束ngx_http_upstream_send_request的執行,等待epoll事件觸發
return;
}
/* rc == NGX_OK */
// 當ngx_output_chain返回NGX_OK時,表示向上遊伺服器傳送完了所有的請求,將寫事件的回撥設定為空函式
......
u->write_event_handler = ngx_http_upstream_dummy_handler;
// 重新新增到epoll中
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 傳送完請求後,需要開始讀上游返回的響應,設定讀事件的超時時間
ngx_add_timer(c->read, u->conf->read_timeout);
// 當ready已經設定時,說明應答已經到位,呼叫process_header開始處理來自上游的響應
if (c->read->ready) {
ngx_http_upstream_process_header(r, u);
return;
}
}
接收上游伺服器的響應
Nginx的upstream機制支援三種響應包體的處理方式:不轉發響應、轉發響應時以下游網速優先、轉發響應時以上游網速優先。當ngx_http_request_t結構體的
subrequest_in_memory標誌位為1時,即不轉發響應;當subrequest_in_memory為0時,則轉發響應;而ngx_http_upstream_conf_t配置結構中的buffering
為0時,則以下游網速優先,即使用固定大小的記憶體作為快取;當buffering為1時,則以上游網速優先,即採用更多的記憶體、硬碟檔案作為快取。
下面看一下用於接收、解析響應頭部的ngx_http_upstream_process_header方法
點選(此處)摺疊或開啟
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// 獲取到上游伺服器的連線資訊
c = u->peer.connection;
......
// 檢查是否發生了讀事件超時,如果發生了超時,則呼叫ngx_http_upstream_next函式決定下一步動作
if (c->read->timedout) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
// request_sent為1則代表已經向上遊發過請求;為0則代表還沒有傳送請求,沒有傳送請求卻收到上游的響應時,則不符合邏輯,進行下一步動作
// ngx_http_upstream_next會根據配置資訊決定是否直接結束請求,還是尋找下一個上游伺服器
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
// 檢查用於接收上游響應的buffer,當start為NULL時,代表該緩衝區尚未進行分配,此時會按照配置指定的buffer_size進行緩衝區的分配
if (u->buffer.start == NULL) {
u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
if (u->buffer.start == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 針對新申請的緩衝區進行初始化,省略
.......
}
for ( ;; ) {
// 讀取響應的內容儲存在buffer中,每次讀取的最大不超過buffer_size,即當前緩衝區的剩餘空間大小
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
// NGX_AGAIN代表響應還沒有讀完,設定讀事件到epoll中,等待下一次讀取
if (n == NGX_AGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
// 讀取出錯或者連線已關閉,則呼叫next函式決定是終止連線,還是重新選擇上游伺服器
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
// n 大於0時,代表讀取到的資料,此時last遊標需要往後移動n個位元組,last初始化時與start相同,指向buffer起始地址
u->buffer.last += n;
// 開始處理讀取到的響應頭部資訊
rc = u->process_header(r);
// 檢查process_header的返回值,當返回NGX_AGAIN時,需要判斷一下是否當前的緩衝區已經被用盡,如果被用盡說明一個buffer_size無法容納整個響應頭部
if (rc == NGX_AGAIN) {
// 當buffer無法容納整個響應頭部時,呼叫next決定是終止連線還是選擇下一個上游伺服器
if (u->buffer.last == u->buffer.end) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
......
}
// 當process_header處理的是完整的響應頭部時,會進一步判斷其返回值,檢測到無效的響應頭部時,進行next的進一步決策處理
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
// 當process_header返回ERROR時,直接終止當前的請求
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// 走到目前位置,當前的process_header至少是執行成功了,完整的解析了響應的頭部資訊
/* rc == NGX_OK */
.......
// 處理已經解析出的頭部,該函式會把已經解析出的頭部,設定到ngx_http_request_t結構體的headers_out成員中
// 當呼叫ngx_http_send_header時,可以將設定到headers_out中的響應頭部發送給客戶端
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}
// subrequest_in_memory欄位為0時,表示需要轉發響應到客戶端;為1時,表示不需要轉發響應到客戶端
if (!r->subrequest_in_memory) {
// 傳送響應給客戶端
ngx_http_upstream_send_response(r, u);
return;
}
// 以下的邏輯是不需要轉發響應給客戶端,即subrequest_in_memory為1的情況
/* subrequest content in memory */
// 檢查一下input_filter是否為NULL,input_filter用於處理響應的包體,當沒有定義自己的實現方法時,使用預設的處理方法
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
// 呼叫init方法為即將進行的包體處理做一些初始化的工作,預設的init函式是空的,什麼也沒做
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// pos與last之間的內容是已經讀取但尚未處理的資料
n = u->buffer.last - u->buffer.pos;
// 當process_header處理完後,如果還有尚未處理的資料,那說明除了讀到了包頭之外,還讀到部分包體資訊
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
// 呼叫input_filter處理已經讀到的包體資訊
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
......
// 設定處理上游響應包體的回撥函式
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
// 開始處理包體的資訊
ngx_http_upstream_process_body_in_memory(r, u);
}
下面繼續分析一下,不用upstream直接轉發響應時的具體處理流程,主要是上面subrequest_memory為1的場景,此時該請求屬於一個子請求。
我們看一下上面分析時提到的預設的input_filter的處理方法,在上面的分析中,如果讀取包頭時同時讀到了包體資訊,會呼叫input_filter方法處理:
點選(此處)摺疊或開啟
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
{
// data指向了請求的ngx_http_request_t結構,前面函式中當沒有定義input_filter時,對input_filter_ctx進行了重新初始化,指向了ngx_http_request_t
ngx_http_request_t *r = data;
......
u = r->upstream;
// 遍歷out_bufs使ll指向最後一個緩衝區->next的地址
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
}
// 申請新的緩衝區
cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
}
// 將新申請的緩衝區掛在out_bufs的連結串列末尾
*ll = cl;
......
// buffer為接收上游響應包體的緩衝區
b = &u->buffer;
// b->last在呼叫該函式時,已經指向了接收到的包體的首地址,cl->buf->pos指向首地址後,將b->last和cl->buf->last設定為儲存包體尾部
cl->buf->pos = b->last;
b->last += bytes;
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;
// 如果沒有設定包體長度,則到此可以結束了
if (u->length == -1) {
return NGX_OK;
}
// 計算還需要接收的包體的長度
u->length -= bytes;
return NGX_OK;
}
繼續向下分析,process_header呼叫input_filter處理完包體後,最後呼叫的函式時ngx_http_upstream_process_body_in_memory,
該函式實際上會接收上游伺服器的包體內容,下面看一下具體實現。
點選(此處)摺疊或開啟
static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// 獲取到上游的連線資訊
c = u->peer.connection;
// 獲取該連線的讀事件,判斷是否發生了讀事件的超時,如果超時,則直接結束連線
rev = c->read;
if (rev->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
return;
}
// buffer為儲存上游響應包體的緩衝區
b = &u->buffer;
for ( ;; ) {
// 計算剩餘空閒緩衝區的大小
size = b->end - b->last;
......
// 如果還有空閒的空間,呼叫recv方法繼續讀取響應
n = c->recv(c, b->last, size);
// 此處NGX_AGAIN代表需要等待下一次的讀事件
if (n == NGX_AGAIN) {
break;
}
// 如果上游主動關閉連線,或者讀取出現錯誤,則直接關閉連線
if (n == 0 || n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, n);
return;
}
// 更新讀到的響應包體的長度
u->state->response_length += n;
// 處理讀到的包體內容
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (!rev->ready) {
break;
}
}
// 如果包體長度沒有設定,則可以直接結束請求了
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
// 將讀事件增加到Epoll中
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 將讀事件同時新增到定時器中,超時時間為配置的read_timeout,避免長時間等待
if (rev->active) {
ngx_add_timer(rev, u->conf->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
上面流程很容易看出一個問題,那就是讀取響應頭的Buffer的空間可能不足,導致處理出現問題。使用時關鍵還在於Input_filter方法中對buffer的管理。
分析完不轉發響應的過程後,繼續看一下轉發響應的兩種實現方式,下游網速優先和上游網速優先的實現。由於上游網速優先的方式,實現較為複雜,下面先看一下下游網速優先的方式,即採用固定的記憶體大小,作為響應的緩衝區。程式碼上也刪減不必要的邏輯。
下游網速優先
點選(此處)摺疊或開啟
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
......
// 向下遊的客戶端傳送響應頭部,前面process_header處理時先將響應頭部設定到了headers_in中,然後upstream_process_headers將headers_in中的
// 頭部設定到headers_out中,ngx_http_send_header就是將headers_out中的http包頭髮送給客戶端
rc = ngx_http_send_header(r);
......
// 設定頭部已傳送的標誌
u->header_sent = 1;
......
// 如果早期的請求攜帶了包體資訊,且用到了臨時檔案,則先清理臨時檔案,因為已經收到響應了,請求的臨時檔案肯定用不到了
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
......
// buffering為1代表上游網速優先,為0代表下游網速優先
if (!u->buffering) {
// 看一下使用者有沒有設定input_filter,沒有的話使用預設的input_filter函式
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
// 設定接收上游響應的回撥函式
u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
// 設定向下遊客戶端傳送報文的回撥函式
r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream;
r->limit_rate = 0;
// 為input_filter處理包體做初始化的準備函式,預設實現是空的
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
......
// 看一下解析完包頭後,是否還有未解析的包體資訊,如果存在包體,則先處理一次包體,和前面分析不轉發響應的邏輯是一樣的
n = u->buffer.last - u->buffer.pos;
if (n) {
// last指向代處理的包體的起始地址,更新response_length,呼叫input_filter處理當前的包體
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 呼叫downstream將本次的包體轉發給客戶端
ngx_http_upstream_process_non_buffered_downstream(r);
} else {
// 清空buff,實際上是將pos和last指標復位
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
// Todo....
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
// 如果連線上的讀事件已經準備好或者響應頭部沒有指定包體長度時,直接呼叫downstream方法處理響應
if (u->peer.connection->read->ready || u->length == 0) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}
// 將控制權轉交給Nginx框架
return;
}
......
}
ngx_http_upstream_process_non_buffered_downstream函式,用於處理上游伺服器響應的讀事件
點選(此處)摺疊或開啟
static void ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
{
// 獲取與上游伺服器的連線資訊,和寫事件資訊
c = r->connection;
u = r->upstream;
wev = c->write;
......
// 如果出現寫事件超時,則設定超時標籤,同時終止連線
if (wev->timedout) {
c->timedout = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
return;
}
// non_buffered即固定記憶體,用固定記憶體處理轉發響應,其中第二個引數是個標籤,為1時代表向下遊傳送響應,為0時代表讀取上游的響應
ngx_http_upstream_process_non_buffered_request(r, 1);
}
下面繼續分析一下ngx_http_upstream_process_non_buffered_request
點選(此處)摺疊或開啟
static void
ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r, ngx_uint_t do_write)
{
// 獲取上游和現有的連線資訊,記錄為downstream和upstream
u = r->upstream;
downstream = r->connection;
upstream = u->peer.connection;
b = &u->buffer;
// 判斷是否向下遊寫,do_write是呼叫方設定的,而u->length表示還需要接收的上游響應的長度,為0則代表不需要繼續接收
do_write = do_write || u->length == 0;
for ( ;; ) {
// 判斷是否需要向客戶端寫資料
if (do_write) {
// out_bufs中記錄的是需要向下遊寫的資料,而busy_bufs用於記錄當out_bufs無法一次發完時指向out_bufs,從而將out_bufs置空
if (u->out_bufs || u->busy_bufs) {
// 向下遊傳送out_bufs指向的內容,busy_bufs中記錄的是上一次的out_bufs的內容,現在已經合併當前的out_bufs中了
rc = ngx_http_output_filter(r, u->out_bufs);
// 回收out_bufs上已經發送的buf,將未傳送完的buf設定到busy_buf上,清空out_bufs
ngx_chain_update_chains(r->pool, &u->free_bufs, &u->busy_bufs, &u->out_bufs, u->output.tag);
}
// 當busy_bufs為空時,說明當前沒有需要傳送到客戶端的內容了
if (u->busy_bufs == NULL) {
......
// 將pos和last重新置位
b->pos = b->start;
b->last = b->start;
}
}
// 計算一下buffer的可用空間
size = b->end - b->last;
// 繼續讀取響應
if (size && upstream->read->ready) {
n = upstream->recv(upstream, b->last, size);
// NGX_AGAIN代表需要等待下一次讀事件
if (n == NGX_AGAIN) {
break;
}
// n > 0表示讀到的n位元組的正常響應包體
if (n > 0) {
&nb