Nginx程序分析(worker_process篇)
阿新 • • 發佈:2019-01-03
Linux下程序的建立使用fork系統呼叫,fork在哪裡呢?答案是在ngx_spawn_process函式中。我們先不忙著看ngx_spawn_process函式,先看呼叫ngx_spawn_process的ngx_start_worker_processes函式。該函式在主執行緒迴圈中需要建立工作程序的地方被呼叫。
void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i, s; ngx_channel_t ch; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes"); ch.command = NGX_CMD_OPEN_CHANNEL; for (i = 0; i < n; i++) { cpu_affinity = ngx_get_cpu_affinity(i); ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL, "worker process", type); ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; for (s = 0; s < ngx_last_process; s++) { if (s == ngx_process_slot || ngx_processes[s].pid == -1 || ngx_processes[s].channel[0] == -1) { continue; } ngx_log_debug6(NGX_LOG_DEBUG_CORE, cycle->log, 0, "pass channel s:%d pid:%P fd:%d to s:%i pid:%P fd:%d", ch.slot, ch.pid, ch.fd, s, ngx_processes[s].pid, ngx_processes[s].channel[0]); ngx_write_channel(ngx_processes[s].channel[0], &ch, sizeof(ngx_channel_t), cycle->log); } } }
ngx_start_worker_process函式還是很簡單的,基本就是呼叫ngx_spawn_process函式建立工作程序,然後通知所有程序。下面我們就具體看看主程序是如何建立工作程序的。
ngx_spawn_process函式首先需要為即將建立的工作程序分配一個程序資訊結構體。如果是重啟某個工作程序,直接使用respawn引數作為工作程序索引,否則需要從ngx_process陣列中找到一個空閒的元素。如果ngx_process陣列全部用光了,就返回錯誤。函式在fork之前會將新程序在ngx_process中的索引儲存在ngx_process_slot中。
如果希望新程序與主程序是分離的(NGX_PROCESS_DETACHED),就只需要將程序資訊中的管道描述符設定為-1即可。否則,就需要建立一對Unix域套接字描述符,並將其設定為非同步非阻塞。ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s; if (respawn >= 0) { s = respawn; } else { for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } } if (s == NGX_MAX_PROCESSES) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "no more than %d processes can be spawned", NGX_MAX_PROCESSES); return NGX_INVALID_PID; } } ... ngx_process_slot = s;
準備好channel後就可以呼叫fork建立程序了,這段程式碼Linux程式設計師肯定很熟悉了:if (respawn != NGX_PROCESS_DETACHED) { if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; } ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0, "channel %d:%d", ngx_processes[s].channel[0], ngx_processes[s].channel[1]); // 設定channel為非阻塞 if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 設定channel[0]為非同步 on = 1; if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // <span style="font-family: Arial, Helvetica, sans-serif;">當channel[0]資料到來時,向主程序傳送SIGIO訊號</span> if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 希望程序呼叫exec執行外部程式的時候自動關閉channel if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } ngx_channel = ngx_processes[s].channel[1]; } else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; }
pid = fork();
switch (pid) {
case -1:
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
case 0:
ngx_pid = ngx_getpid();
proc(cycle, data);
break;
default:
break;
}
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);
ngx_processes[s].pid = pid;
ngx_processes[s].exited = 0;
if (respawn >= 0) {
return pid;
}
工作程序在這裡呼叫proc進入程序迴圈,而主程序則將新程序的pid儲存到程序資訊中,對於重新啟動的程序,到這裡就結束了。接下來,也是最後一部分是填充程序結構體中的其他一部分資訊,對於重啟的程序這些資訊是保留著的。這部分是由主程序負責完成的,並不是本文討論的重點。
ngx_processes[s].proc = proc;
ngx_processes[s].data = data;
ngx_processes[s].name = name;
ngx_processes[s].exiting = 0;
switch (respawn) {
case NGX_PROCESS_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_respawn = 0;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_JUST_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_respawn = 1;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_DETACHED:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_respawn = 0;
ngx_processes[s].detached = 1;
break;
}
// 到這裡才算真正在ngx_processes陣列中增加了新程序對應的元素
if (s == ngx_last_process) {
ngx_last_process++;
}
return pid;
}
講述了工作程序建立的過程之後,就可以看看工作程序的迴圈函數了。Nginx可以支援多執行緒,但本文不關心多執行緒的情況,下面程式碼中的省略號部分為支援多執行緒的程式碼,可以忽略而不影響我們理解。
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_uint_t i;
ngx_connection_t *c;
...
ngx_worker_process_init(cycle, 1);
ngx_setproctitle("worker process");
...
for ( ;; ) {
if (ngx_exiting) {
c = cycle->connections;
for (i = 0; i < cycle->connection_n; i++) {
/* THREAD: lock */
if (c[i].fd != -1 && c[i].idle) {
c[i].close = 1;
c[i].read->handler(c[i].read);
}
}
if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
{
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
ngx_process_events_and_timers(cycle);
if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
if (ngx_quit) {
ngx_quit = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"gracefully shutting down");
ngx_setproctitle("worker process is shutting down");
if (!ngx_exiting) {
ngx_close_listening_sockets(cycle);
ngx_exiting = 1;
}
}
if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, -1);
}
}
}
ngx_worker_process_cycle的主迴圈中最核心的是事件處理函式ngx_process_events_and_timers。
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
// timer是事件等待的超時值,如果設定了定時器解析度,則定時器會週期觸發事件,
// 程序將不斷地被喚醒處理事件,因此不必設定超時值,也不需要設定NGX_TIMER_INFINITE。
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
}
// 以下語句塊處理“驚群”
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
// 工作程序處於過載,僅僅將ngx_accept_disabled減一,不請求鎖,從而避免過載時引發驚群效應
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);
if (ngx_posted_events) {
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}