Nginx(二): worker 程序處理流程框架
Nginx 啟動起來之後,會有幾個程序執行:1. master 程序接收使用者命令並做出響應; 2. worker 程序負責處理各網路事件,並同時接收來自master的處理協調命令;
master 主要是一控制命令,我們後面再說,而worker則是處理的nginx的核心任務,請求轉發、反向代理、負載均衡等工作。所以我們先來啃啃worker這塊硬骨頭吧!
0. worker 主迴圈
worker 的啟動是被master 操作的,作為一個 fork 出來的程序,它擁有和master一樣的記憶體資料資訊。但它的活動範圍相對較小,所以它並不會替代master的位置。
// unix/ngx_process_cycle.c void ngx_master_process_cycle(ngx_cycle_t *cycle) { char *title; u_char *p; size_t size; ngx_int_t i; ngx_uint_t sigio; sigset_t set; struct itimerval itv; ngx_uint_t live; ngx_msec_t delay; ngx_core_conf_t *ccf; sigemptyset(&set); sigaddset(&set, SIGCHLD); sigaddset(&set, SIGALRM); sigaddset(&set, SIGIO); sigaddset(&set, SIGINT); sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL)); if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "sigprocmask() failed"); } sigemptyset(&set); size = sizeof(master_process); for (i = 0; i < ngx_argc; i++) { size += ngx_strlen(ngx_argv[i]) + 1; } title = ngx_pnalloc(cycle->pool, size); if (title == NULL) { /* fatal */ exit(2); } p = ngx_cpymem(title, master_process, sizeof(master_process) - 1); for (i = 0; i < ngx_argc; i++) { *p++ = ' '; p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size); } ngx_setproctitle(title); ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 啟動之後會主動啟動 worker 程序 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0); ngx_new_binary = 0; delay = 0; sigio = 0; live = 1; for ( ;; ) { if (delay) { if (ngx_sigalrm) { sigio = 0; delay *= 2; ngx_sigalrm = 0; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "termination cycle: %M", delay); itv.it_interval.tv_sec = 0; itv.it_interval.tv_usec = 0; itv.it_value.tv_sec = delay / 1000; itv.it_value.tv_usec = (delay % 1000 ) * 1000; if (setitimer(ITIMER_REAL, &itv, NULL) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "setitimer() failed"); } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "sigsuspend"); sigsuspend(&set); ngx_time_update(); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "wake up, sigio %i", sigio); if (ngx_reap) { ngx_reap = 0; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children"); live = ngx_reap_children(cycle); } if (!live && (ngx_terminate || ngx_quit)) { ngx_master_process_exit(cycle); } if (ngx_terminate) { if (delay == 0) { delay = 50; } if (sigio) { sigio--; continue; } sigio = ccf->worker_processes + 2 /* cache processes */; if (delay > 1000) { ngx_signal_worker_processes(cycle, SIGKILL); } else { ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_TERMINATE_SIGNAL)); } continue; } if (ngx_quit) { ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); ngx_close_listening_sockets(cycle); continue; } if (ngx_reconfigure) { ngx_reconfigure = 0; if (ngx_new_binary) { ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0); ngx_noaccepting = 0; continue; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring"); cycle = ngx_init_cycle(cycle); if (cycle == NULL) { cycle = (ngx_cycle_t *) ngx_cycle; continue; } ngx_cycle = cycle; ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 收到reconfig命令時,重啟worker 程序 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_JUST_RESPAWN); ngx_start_cache_manager_processes(cycle, 1); /* allow new processes to start */ ngx_msleep(100); live = 1; ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); } if (ngx_restart) { ngx_restart = 0; // 收到重啟命令時,傳遞訊息給 worker ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ngx_start_cache_manager_processes(cycle, 0); live = 1; } if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, ccf->user); ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_REOPEN_SIGNAL)); } if (ngx_change_binary) { ngx_change_binary = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "changing binary"); ngx_new_binary = ngx_exec_new_binary(cycle, ngx_argv); } if (ngx_noaccept) { ngx_noaccept = 0; ngx_noaccepting = 1; ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); } } } static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i; ngx_channel_t ch; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes"); ngx_memzero(&ch, sizeof(ngx_channel_t)); ch.command = NGX_CMD_OPEN_CHANNEL; // n 代表worker的程序數, 在 nginx.conf 中配置 for (i = 0; i < n; i++) { // 依次啟動 worker 程序,實際上就是通過fork進行子程序啟動的 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; ngx_pass_open_channel(cycle, &ch); } } ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s; if (respawn >= 0) { s = respawn; } else { for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } } if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } if (respawn != NGX_PROCESS_DETACHED) { /* Solaris 9 still has no AF_LOCAL */ if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } on = 1; if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ngx_channel = ngx_processes[s].channel[1]; } else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; // fork 出子程序出來 pid = fork(); switch (pid) { case -1: ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; case 0: ngx_parent = ngx_pid; ngx_pid = ngx_getpid(); // 子程序將呼叫傳入的處理方法,worker 則會進入迴圈處理事件邏輯中 // 即 ngx_worker_process_cycle 迴圈 proc(cycle, data); break; default: break; } ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid); ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; if (respawn >= 0) { return pid; } ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; switch (respawn) { case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break; case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break; case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; } if (s == ngx_last_process) { ngx_last_process++; } return pid; } // os/unix/ngx_process_cycle.c // worker 主迴圈服務 static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { ngx_int_t worker = (intptr_t) data; ngx_process = NGX_PROCESS_WORKER; ngx_worker = worker; ngx_worker_process_init(cycle, worker); // 程序標題 worker process ngx_setproctitle("worker process"); // 死迴圈處理 worker 事務 for ( ;; ) { // 大部分邏輯在接受 master 傳遞過來折命令 if (ngx_exiting) { if (ngx_event_no_timers_left() == NGX_OK) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle"); // 這是其核心任務,檢測事件、處理事件 ngx_process_events_and_timers(cycle); // 大部分邏輯在接受 master 傳遞過來折命令 if (ngx_terminate) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } // 退出事件 if (ngx_quit) { ngx_quit = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down"); ngx_setproctitle("worker process is shutting down"); if (!ngx_exiting) { ngx_exiting = 1; ngx_set_shutdown_timer(cycle); ngx_close_listening_sockets(cycle); ngx_close_idle_connections(cycle); } } // reopen 事件 if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, -1); } } }
上面就是nginx worker的主要功能體現, 使用一個死迴圈提供服務. 有很多是介面master命令進行響應的邏輯, 咱們忽略其對master命令的響應,觀其業務核心: ngx_process_events_and_timers .
// event/ngx_event.c // nginx worker 處理io事件和超時佇列流程 void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { // 獲取timer timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; #if (NGX_WIN32) /* handle signals from master in case of network inactivity */ if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; } #endif } // 使用鎖進行 tcp 監聽 // 該鎖基於 shm 實現,多程序共享記憶體 if (ngx_use_accept_mutex) { // disabled 用於優化監聽鎖競爭,直到 ngx_accept_disabled 小於0 if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { // 通過 shm 獲取一個程序鎖,沒搶到鎖則直接返回了 // 獲取到accept鎖之後,其會註冊 read 事件監聽,所以,當其返回後,則意味著資料就緒 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } // 獲取到鎖,設定 flags if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } // post 事件佇列不為空,則觸發事件處理 if (!ngx_queue_empty(&ngx_posted_next_events)) { ngx_event_move_posted_next(cycle); timer = 0; } delta = ngx_current_msec; // 處理事件 ngx_event_actions.process_events, 將會進行阻塞等待 // 此處的 ngx_event_actions 由系統決定如何初始化,如 linux 下 // 使用 event/modules/ngx_epoll_module.c 中的定義 ngx_event_actions = ngx_epoll_module_ctx.actions; // 而其他系統則另外決定, 總體來說可能有以下幾種可能 // ngx_devpoll_module_ctx.actions; // ngx_epoll_module_ctx.actions; // ngx_eventport_module_ctx.actions; // ngx_iocp_module_ctx.actions; // ngx_kqueue_module_ctx.actions; // ngx_select_module_ctx.actions; // ngx_poll_module_ctx.actions; /** * 其定義樣例如下: static ngx_event_module_t ngx_select_module_ctx = { &select_name, NULL, /* create configuration */ ngx_select_init_conf, /* init configuration */ { ngx_select_add_event, /* add an event */ ngx_select_del_event, /* delete an event */ ngx_select_add_event, /* enable an event */ ngx_select_del_event, /* disable an event */ NULL, /* add an connection */ NULL, /* delete an connection */ NULL, /* trigger a notify */ ngx_select_process_events, /* process the events */ ngx_select_init, /* init the events */ ngx_select_done /* done the events */ } }; */ (void) ngx_process_events(cycle, timer, flags); // 計算耗時 delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); // 處理 posted 事件,它存放在 ngx_posted_accept_events 佇列中 ngx_event_process_posted(cycle, &ngx_posted_accept_events); // 處理完事件後,釋放鎖 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } // 處理超時的任務 if (delta) { ngx_event_expire_timers(); } // 讀寫事件將會被新增到 ngx_posted_events 佇列中 ngx_event_process_posted(cycle, &ngx_posted_events); }
以上也就是nginx worker的主要功能框架了:
1. 先通過shm獲取tcp的監聽鎖, 避免socket驚群;
2. 獲取到鎖的worker程序, 將會註冊accept的read事件;
3. 如果有 ngx_posted_next_events 佇列, 則先處理其佇列請求;
4. 根據系統型別呼叫網路io模組, select 機制接收io事件;
5. 接入accept事件後, 釋放accept鎖(基於shm);
6. 處理過期超時佇列;
7. 處理普通的已接入的socket的讀寫事件;
一次處理往往只會處理部分事件, 比如可能只是處理了 accept, read 則需要在下一次或n次之後才會處理, 這也是非同步機制非阻塞的體現.
1. worker 時序圖
下面我先給到一個整個worker的工作時序圖, 以便有個整體的認知.
接下來我們從幾個點依次簡單看看 nginx 是如何處理各細節的.
2. 獲取accept鎖及註冊accept事件
由於nginx是基於多程序實現的併發處理, 那麼各程序必然都需要監聽相同的埠資料, 如果沒有鎖控制, 則當有事件到達時, 必然導致各程序同時被喚醒, 即所謂的驚群. 所以, nginx 提供了一個鎖機制, 使同一時刻只有一個程序在監聽某埠, 從而避免競爭. 實現方式是基於共享記憶體 shm 實現.(如果是多執行緒方式會更簡單喲)
// event/ngx_event_accept.c ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { // 首先獲取shm鎖, 通過 shm 實現程序資料共享 if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); // 如果上一次就是自己執行的accept操作, 則直接返回 // 否則需要重新註冊accept監聽 if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; } // 註冊 accept 事件 if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); if (ngx_accept_mutex_held) { // 如果沒有獲取到鎖,則將之前註冊的 accept 事件取消,避免驚群 if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } // 不管有沒有獲取到鎖, 都會執行後續的邏輯, 因為除了 accept 外, 還有read/write事件需要處理 return NGX_OK; } // core/ngx_shmtx.c, 獲取鎖,鎖的值為當前程序id ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); } // 註冊 accept 事件監聽 // event/ngx_event_accept.c ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c; ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ls[i].connection; if (c == NULL || c->read->active) { continue; } // 註冊accept事件,READ ? // 交由 ngx_event_actions.add 處理, 實際執行由系統決定, 如 ngx_select_add_event if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; } // event/module/ngx_select_module.c // 註冊一個 io 事件監聽, fd_set static ngx_int_t ngx_select_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) { ngx_connection_t *c; c = ev->data; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "select add event fd:%d ev:%i", c->fd, event); if (ev->index != NGX_INVALID_INDEX) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "select event fd:%d ev:%i is already set", c->fd, event); return NGX_OK; } if ((event == NGX_READ_EVENT && ev->write) || (event == NGX_WRITE_EVENT && !ev->write)) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "invalid select %s event fd:%d ev:%i", ev->write ? "write" : "read", c->fd, event); return NGX_ERROR; } if (event == NGX_READ_EVENT) { FD_SET(c->fd, &master_read_fd_set); } else if (event == NGX_WRITE_EVENT) { FD_SET(c->fd, &master_write_fd_set); } if (max_fd != -1 && max_fd < c->fd) { max_fd = c->fd; } ev->active = 1; event_index[nevents] = ev; ev->index = nevents; nevents++; return NGX_OK; }
主要就是shm的應用,以及fd_set處理。
3. 通用處理佇列實現
在 ngx_process_events_and_timers 中, 我們看到, 在io事件返回之後, 都會多次進行佇列處理. 它們的不同僅在於 佇列不同. 那麼, 它是如何實現這個處理過程的呢?
我們分兩塊來看這事: 1. 佇列的資料結構; 2. 執行佇列任務; so... 就這樣唄.
// 1. 佇列資料結構 // 額, 兩個迴圈巢狀的指標就是其結構了 typedef struct ngx_queue_s ngx_queue_t; struct ngx_queue_s { ngx_queue_t *prev; ngx_queue_t *next; }; // 實際上, 此處還會有一個強制型別轉換 ngx_event_t typedef struct ngx_event_s ngx_event_t; struct ngx_event_s { void *data; unsigned write:1; unsigned accept:1; /* used to detect the stale events in kqueue and epoll */ unsigned instance:1; /* * the event was passed or would be passed to a kernel; * in aio mode - operation was posted. */ unsigned active:1; unsigned disabled:1; /* the ready event; in aio mode 0 means that no operation can be posted */ unsigned ready:1; unsigned oneshot:1; /* aio operation is complete */ unsigned complete:1; unsigned eof:1; unsigned error:1; unsigned timedout:1; unsigned timer_set:1; unsigned delayed:1; unsigned deferred_accept:1; /* the pending eof reported by kqueue, epoll or in aio chain operation */ unsigned pending_eof:1; unsigned posted:1; unsigned closed:1; /* to test on worker exit */ unsigned channel:1; unsigned resolver:1; unsigned cancelable:1; #if (NGX_HAVE_KQUEUE) unsigned kq_vnode:1; /* the pending errno reported by kqueue */ int kq_errno; #endif /* * kqueue only: * accept: number of sockets that wait to be accepted * read: bytes to read when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * write: available space in buffer when event is ready * or lowat when event is set with NGX_LOWAT_EVENT flag * * iocp: TODO * * otherwise: * accept: 1 if accept many, 0 otherwise * read: bytes to read when event is ready, -1 if not known */ int available; // 這個handler 比較重要, 它決定了本事件如何進行處理 ngx_event_handler_pt handler; #if (NGX_HAVE_IOCP) ngx_event_ovlp_t ovlp; #endif ngx_uint_t index; ngx_log_t *log; ngx_rbtree_node_t timer; // queue 則是存放整個佇列所有資料的地方 /* the posted queue */ ngx_queue_t queue; #if 0 /* the threads support */ /* * the event thread context, we store it here * if $(CC) does not understand __thread declaration * and pthread_getspecific() is too costly */ void *thr_ctx; #if (NGX_EVENT_T_PADDING) /* event should not cross cache line in SMP */ uint32_t padding[NGX_EVENT_T_PADDING]; #endif #endif }; // 有了資料結構支援後, 要處理佇列就簡單了, 只需遍歷資料即可 // event/ngx_event_posted.c void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) { ngx_queue_t *q; ngx_event_t *ev; while (!ngx_queue_empty(posted)) { q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); // 先刪除事件,再進行處理, 這在單程序單執行緒下沒有問題的喲 ngx_delete_posted_event(ev); // 呼叫 event 對應的handler 處理事件 // 所以核心在於這個 handler 的定義 ev->handler(ev); } }
以上的實現, 雖然是面向過程語言寫的, 但因為有 struct 資料型別的支援, 實際上也是面向物件的概念呢.
4. io事件的監聽實現
作為一個web伺服器或者反向代理伺服器, 其核心必然是網路io事件的處理. nginx 會根據不同的作業系統支援, 選擇不同的io模型進行io事件的監聽, 充分發揮系統的效能. 這也是其制勝之道吧. 具體如何確定哪種型別, 實際上可以在進行編譯的時候, 獲取系統變數來斷定. (稍詳細的說明, 見前面程式碼註釋)
我們以 select 的實現來看看細節:
// event/module/ngx_select_module.c // io 事件監聽 static ngx_int_t ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int ready, nready; ngx_err_t err; ngx_uint_t i, found; ngx_event_t *ev; ngx_queue_t *queue; struct timeval tv, *tp; ngx_connection_t *c; // 獲取 max_fd, 系統傳值需要 if (max_fd == -1) { for (i = 0; i < nevents; i++) { c = event_index[i]->data; if (max_fd < c->fd) { max_fd = c->fd; } } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "change max_fd: %i", max_fd); } #if (NGX_DEBUG) if (cycle->log->log_level & NGX_LOG_DEBUG_ALL) { for (i = 0; i < nevents; i++) { ev = event_index[i]; c = ev->data; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select event: fd:%d wr:%d", c->fd, ev->write); } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "max_fd: %i", max_fd); } #endif if (timer == NGX_TIMER_INFINITE) { tp = NULL; } else { tv.tv_sec = (long) (timer / 1000); tv.tv_usec = (long) ((timer % 1000) * 1000); tp = &tv; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select timer: %M", timer); work_read_fd_set = master_read_fd_set; work_write_fd_set = master_write_fd_set; // 在此處交由核心進行處理網路事件,epoll 機制,至少有一個事件到來時返回 // tp 代表是否要超時退出 ready = select(max_fd + 1, &work_read_fd_set, &work_write_fd_set, NULL, tp); err = (ready == -1) ? ngx_errno : 0; if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { // 事件結束後,先嚐試更新gmtTime 時間資訊 ngx_time_update(); } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select ready %d", ready); if (err) { ngx_uint_t level; if (err == NGX_EINTR) { if (ngx_event_timer_alarm) { ngx_event_timer_alarm = 0; return NGX_OK; } level = NGX_LOG_INFO; } else { level = NGX_LOG_ALERT; } ngx_log_error(level, cycle->log, err, "select() failed"); if (err == NGX_EBADF) { ngx_select_repair_fd_sets(cycle); } return NGX_ERROR; } if (ready == 0) { if (timer != NGX_TIMER_INFINITE) { return NGX_OK; } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select() returned no events without timeout"); return NGX_ERROR; } nready = 0; // 遍歷所有事件 for (i = 0; i < nevents; i++) { ev = event_index[i]; c = ev->data; found = 0; // 寫事件處理 if (ev->write) { if (FD_ISSET(c->fd, &work_write_fd_set)) { found = 1; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select write %d", c->fd); } } // 讀或accept事件 else { if (FD_ISSET(c->fd, &work_read_fd_set)) { found = 1; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select read %d", c->fd); } } // 讀寫就緒事件 found 都為1 if (found) { ev->ready = 1; ev->available = -1; // 如果是 accept 事件則取 ngx_posted_accept_events 佇列 // 否則取 ngx_posted_events 佇列 queue = ev->accept ? &ngx_posted_accept_events : &ngx_posted_events; // 將事件插入到相應佇列尾部 ngx_post_event(ev, queue); // 有效就緒事件+1 nready++; } } // 如果兩個值不相等,則需要修正下 if (ready != nready) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select ready != events: %d:%d", ready, nready); ngx_select_repair_fd_sets(cycle); } return NGX_OK; }
上面就是io事件的處理的了, 因為是 select 的實現, 所以呼叫系統的 select() 函式即可接收網路事件了. 具體能獲取哪些事件, 實際上前面的工作已經決定了. 此處只是一個執行者的角色. 它是否高效, 則是取決於作業系統的io模型是否高效了. 有興趣的同學可以看下 epoll 的實現.
5. accept 事件的處理
當系統發現有新的網路連線進來時, 會生成一個accept的事件, 給到應用. nginx 接收到accept事件後, 會放入 ngx_posted_accept_events 中, 然後呼叫通用佇列處理方法處理佇列. 此處的 handler 是 ngx_event_accept . 其核心工作就是建立新的socket連線, 以便後續讀寫.
// event/ngx_event_accept.c // accept 事件處理入口 void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; #if (NGX_HAVE_ACCEPT4) static ngx_uint_t use_accept4 = 1; #endif if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } // 獲取配置資訊 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept on %V, ready: %d", &ls->addr_text, ev->available); // 迴圈處理socket資料 do { socklen = sizeof(ngx_sockaddr_t); #if (NGX_HAVE_ACCEPT4) if (use_accept4) { // 呼叫accept() 方法接入socket連線 s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK); } else { s = accept(lc->fd, &sa.sockaddr, &socklen); } #else s = accept(lc->fd, &sa.sockaddr, &socklen); #endif if (s == (ngx_socket_t) -1) { err = ngx_socket_errno; if (err == NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; } level = NGX_LOG_ALERT; if (err == NGX_ECONNABORTED) { level = NGX_LOG_ERR; } else if (err == NGX_EMFILE || err == NGX_ENFILE) { level = NGX_LOG_CRIT; } #if (NGX_HAVE_ACCEPT4) ngx_log_error(level, ev->log, err, use_accept4 ? "accept4() failed" : "accept() failed"); if (use_accept4 && err == NGX_ENOSYS) { use_accept4 = 0; ngx_inherited_nonblocking = 0; continue; } #else ngx_log_error(level, ev->log, err, "accept() failed"); #endif if (err == NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } if (ev->available) { continue; } } if (err == NGX_EMFILE || err == NGX_ENFILE) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1) != NGX_OK) { return; } if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; } ngx_accept_disabled = 1; } else { ngx_add_timer(ev, ecf->accept_mutex_delay); } } return; } #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); #endif ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; // 獲取socket讀寫指標 c = ngx_get_connection(s, ev->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } c->type = SOCK_STREAM; #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif // 建立記憶體空間 c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { socklen = sizeof(ngx_sockaddr_t); } c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, &sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; // 建立各種上下文環境給到socket連線 c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } #endif rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; #if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP) rev->available = 1; #endif } rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_handled, 1); #endif if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } #if (NGX_DEBUG) { ngx_str_t addr; u_char text[NGX_SOCKADDR_STRLEN]; ngx_debug_accepted_connection(ecf, c); if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, "*%uA accept: %V fd:%d", c->number, &addr, s); } } #endif if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; // 處理就緒的io事件,讀寫事件,此處將會轉到 http 模組處理 ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); } // http/ngx_http_request.c // 初始化socket連線, 接入 http模組 void ngx_http_init_connection(ngx_connection_t *c) { ngx_uint_t i; ngx_event_t *rev; struct sockaddr_in *sin; ngx_http_port_t *port; ngx_http_in_addr_t *addr; ngx_http_log_ctx_t *ctx; ngx_http_connection_t *hc; #if (NGX_HAVE_INET6) struct sockaddr_in6 *sin6; ngx_http_in6_addr_t *addr6; #endif // 分配資料記憶體 hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t)); if (hc == NULL) { ngx_http_close_connection(c); return; } c->data = hc; /* find the server configuration for the address:port */ port = c->listening->servers; if (port->naddrs > 1) { /* * there are several addresses on this port and one of them * is an "*:port" wildcard so getsockname() in ngx_http_server_addr() * is required to determine a server address */ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { ngx_http_close_connection(c); return; } // 根據網路型別處理 switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: sin6 = (struct sockaddr_in6 *) c->local_sockaddr; addr6 = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) { break; } } hc->addr_conf = &addr6[i].conf; break; #endif default: /* AF_INET */ sin = (struct sockaddr_in *) c->local_sockaddr; addr = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (addr[i].addr == sin->sin_addr.s_addr) { break; } } hc->addr_conf = &addr[i].conf; break; } } else { switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: addr6 = port->addrs; hc->addr_conf = &addr6[0].conf; break; #endif default: /* AF_INET */ addr = port->addrs; hc->addr_conf = &addr[0].conf; break; } } /* the default server configuration for the address:port */ hc->conf_ctx = hc->addr_conf->default_server->ctx; ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t)); if (ctx == NULL) { ngx_http_close_connection(c); return; } ctx->connection = c; ctx->request = NULL; ctx->current_request = NULL; c->log->connection = c->number; // 每個http server 都有自己的日誌記錄控制 c->log->handler = ngx_http_log_error; c->log->data = ctx; c->log->action = "waiting for request"; c->log_error = NGX_ERROR_INFO; rev = c->read; // 設定接收資料處理器為 ngx_http_wait_request_handler rev->handler = ngx_http_wait_request_handler; c->write->handler = ngx_http_empty_handler; #if (NGX_HTTP_V2) if (hc->addr_conf->http2) { rev->handler = ngx_http_v2_init; } #endif #if (NGX_HTTP_SSL) { ngx_http_ssl_srv_conf_t *sscf; sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module); if (sscf->enable || hc->addr_conf->ssl) { hc->ssl = 1; c->log->action = "SSL handshaking"; rev->handler = ngx_http_ssl_handshake; } } #endif if (hc->addr_conf->proxy_protocol) { hc->proxy_protocol = 1; c->log->action = "reading PROXY protocol"; } if (rev->ready) { /* the deferred accept(), iocp */ if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events); return; } rev->handler(rev); return; } // 將rev 放入到 ngx_event_timer_rbtree 佇列中, 紅黑樹實現 ngx_add_timer(rev, c->listening->post_accept_timeout); // 重用 connection ngx_reusable_connection(c, 1); // 處理 讀就緒事件,註冊 read 監聽 if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; } } // event/ngx_event.c // 通用處理: 讀事件邏輯 ngx_int_t ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags) { if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue, epoll */ if (!rev->active && !rev->ready) { if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; } else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) { /* select, poll, /dev/poll */ if (!rev->active && !rev->ready) { // ngx_event_actions.add, 實際為 ngx_select_add_event // 註冊讀事件 if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) { return NGX_ERROR; } return NGX_OK; } if (rev->active && (rev->ready || (flags & NGX_CLOSE_EVENT))) { if (ngx_del_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT | flags) == NGX_ERROR) { return NGX_ERROR; } return NGX_OK; } } else if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) { /* event ports */ if (!rev->active && !rev->ready) { if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } return NGX_OK; } if (rev->oneshot && !rev->ready) { if (ngx_del_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } return NGX_OK; } } /* iocp */ return NGX_OK; }
大體上就是,先呼叫核心的accept() 方法,接入socket, 然後呼叫 http 模組init handler, 註冊讀事件, 以便後續可以讀取資料。至於什麼時候會進行真正地讀資料請求,則不一定。
6. read 事件處理
經過前面的accept處理,nginx會註冊read事件,且會將handler設定為 ngx_http_wait_request_handler, 當資料就緒後,就會從 通用處理佇列 的入口處,轉到http處理模組處理 io 事件。
// http/ngx_http_request.c // 處理socket讀事件 static void ngx_http_wait_request_handler(ngx_event_t *rev) { u_char *p; size_t size; ssize_t n; ngx_buf_t *b; ngx_connection_t *c; ngx_http_connection_t *hc; ngx_http_core_srv_conf_t *cscf; c = rev->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http wait request handler"); if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); ngx_http_close_connection(c); return; } if (c->close) { ngx_http_close_connection(c); return; } hc = c->data; cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module); // 預設1024 緩衝大小 size = cscf->client_header_buffer_size; b = c->buffer; // 首次接入時,建立初始空間 if (b == NULL) { // 建立緩衝區接收http傳過來的資料 b = ngx_create_temp_buf(c->pool, size); if (b == NULL) { ngx_http_close_connection(c); return; } c->buffer = b; } else if (b->start == NULL) { // 緩衝衝填滿,需要另外增加空間? b->start = ngx_palloc(c->pool, size); if (b->start == NULL) { ngx_http_close_connection(c); return; } b->pos = b->start; b->last = b->start; b->end = b->last + size; } // 接收資料 n = c->recv(c, b->last, size); if (n == NGX_AGAIN) { if (!rev->timer_set) { ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); } if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_connection(c); return; } /* * We are trying to not hold c->buffer's memory for an idle connection. */ // 如果還要等待更多資料,釋放佔有空間 if (ngx_pfree(c->pool, b->start) == NGX_OK) { b->start = NULL; } return; } if (n == NGX_ERROR) { ngx_http_close_connection(c); return; } if (n == 0) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client closed connection"); ngx_http_close_connection(c); return; } b->last += n; // 如果配置了 proxy_pass (且匹配了模式), 則直代理邏輯 if (hc->proxy_protocol) { hc->proxy_protocol = 0; p = ngx_proxy_protocol_read(c, b->pos, b->last); if (p == NULL) { ngx_http_close_connection(c); return; } b->pos = p; if (b->pos == b->last) { c->log->action = "waiting for request"; b->pos = b->start; b->last = b->start; ngx_post_event(rev, &ngx_posted_events); return; } } c->log->action = "reading client request line"; // 設定不可重用連線 ngx_reusable_connection(c, 0); // 建立 http 連線請求, 分配記憶體空, 設定下一個 handler 等等 c->data = ngx_http_create_request(c); if (c->data == NULL) { ngx_http_close_connection(c); return; } // 設定讀取資料的處理器為 ngx_http_process_request_line, 以便下次使用 rev->handler = ngx_http_process_request_line; ngx_http_process_request_line(rev); } // http/ngx_http_request.c // 讀取body資料,並響應客戶端 static void ngx_http_process_request_line(ngx_event_t *rev) { ssize_t n; ngx_int_t rc, rv; ngx_str_t host; ngx_connection_t *c; ngx_http_request_t *r; c = rev->data; r = c->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "http process request line"); if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; ngx_http_close_request(r, NGX_HTTP_REQUEST_TIME_OUT); return; } rc = NGX_AGAIN; for ( ;; ) { if (rc == NGX_AGAIN) { // 讀取header n = ngx_http_read_request_header(r); if (n == NGX_AGAIN || n == NGX_ERROR) { break; } } // 讀取body 資料, 按照http協議解析,非常長 rc = ngx_http_parse_request_line(r, r->header_in); if (rc == NGX_OK) { /* the request line has been parsed successfully */ r->request_line.len = r->request_end - r->request_start; r->request_line.data = r->request_start; r->request_length = r->header_in->pos - r->request_start; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http request line: \"%V\"", &r->request_line); r->method_name.len = r->method_end - r->request_start + 1; r->method_name.data = r->request_line.data; if (r->http_protocol.data) { r->http_protocol.len = r->request_end - r->http_protocol.data; } // 處理 uri, 解析路徑 if (ngx_http_process_request_uri(r) != NGX_OK) { break; } if (r->schema_end) { r->schema.len = r->schema_end - r->schema_start; r->schema.data = r->schema_start; } if (r->host_end) { host.len = r->host_end - r->host_start; host.data = r->host_start; rc = ngx_http_validate_host(&host, r->pool, 0); if (rc == NGX_DECLINED) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client sent invalid host in request line"); ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST); break; } if (rc == NGX_ERROR) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); break; } if (ngx_http_set_virtual_server(r, &host) == NGX_ERROR) { break; } r->headers_in.server = host; } if (r->http_version < NGX_HTTP_VERSION_10) { if (r->headers_in.server.len == 0 && ngx_http_set_virtual_server(r, &r->headers_in.server) == NGX_ERROR) { break; } ngx_http_process_request(r); break; } if (ngx_list_init(&r->headers_in.headers, r->pool, 20, sizeof(ngx_table_elt_t)) != NGX_OK) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); break; } c->log->action = "reading client request headers"; rev->handler = ngx_http_process_request_headers; ngx_http_process_request_headers(rev); break; } if (rc != NGX_AGAIN) { /* there was error while a request line parsing */ ngx_log_error(NGX_LOG_INFO, c->log, 0, ngx_http_client_errors[rc - NGX_HTTP_CLIENT_ERROR]); if (rc == NGX_HTTP_PARSE_INVALID_VERSION) { ngx_http_finalize_request(r, NGX_HTTP_VERSION_NOT_SUPPORTED); } else { ngx_http_finalize_request(r, NGX_HTTP_BAD_REQUEST); } break; } /* NGX_AGAIN: a request line parsing is still incomplete */ if (r->header_in->pos == r->header_in->end) { rv = ngx_http_alloc_large_header_buffer(r, 1); if (rv == NGX_ERROR) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); break; } if (rv == NGX_DECLINED) { r->request_line.len = r->header_in->end - r->request_start; r->request_line.data = r->request_start; ngx_log_error(NGX_LOG_INFO, c->log, 0, "client sent too long URI"); ngx_http_finalize_request(r, NGX_HTTP_REQUEST_URI_TOO_LARGE); break; } } } // 處理請求, 響應客戶端 ngx_http_run_posted_requests(c); } // http/ngx_http_request.c // 已經處理好的請求處理 void ngx_http_run_posted_requests(ngx_connection_t *c) { ngx_http_request_t *r; ngx_http_posted_request_t *pr; // 迴圈處理資料,直到完成 for ( ;; ) { if (c->destroyed) { return; } r = c->data; pr = r->main->posted_requests; if (pr == NULL) { return; } r->main->posted_requests = pr->next; r = pr->request; ngx_http_set_log_request(c->log, r); ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http posted request: \"%V?%V\"", &r->uri, &r->args); // 寫客戶端 r->write_event_handler(r); } }
以上就是一個簡單視角的 http 請求的處理大體流程了。從中我們大概也理解了,nginx的處理邏輯,和我們想像的方案並沒有太大差別,先讀取url請求,判斷是否特殊轉發設定,讀取body資料,如果沒有特殊設定則定位到相應檔案直接響應客戶端。(具體如何響應,我們後續再說)
本篇主要站在一個全域性的角度,整體上理解nginx的處理請求流程,希望對大家理解nginx有一定的幫助。當然有很多的細節還未釐清,敬請期