1. 程式人生 > >Nginx 模組開發之日誌模組---實時記錄http請求資訊寫入flume

Nginx 模組開發之日誌模組---實時記錄http請求資訊寫入flume

一、整體架構部署圖, 如下:

                         

本圖只是一個大概的描述,真實的情況會有所差異,後臺部署採用二級負載均衡:一級lvs,二級nginx。日誌框架採用flume(兩種source:syslogtcp和avro),日誌分析採用hadoop。 nginx和web server上的服務程式都需要寫日誌,其中nginx部分並不支援遠端寫日誌功能,需要開發獨立的模組。這也就是本文的重點,其他部分不做具體闡述。

二、需求   

除了記錄一些常規的日誌資訊外,根據專案需求還要記錄每個http請求從發起到處理整個過程的時間資訊。這就要求給每一個http請求加上一個唯一標示,經過考慮決定在http的header部分加入“id:uuid“,同時為了保證http請求到達時間的準確性,將日誌模組放在nginx http 11個處理流程中的第一個階段(NGX_HTTP_POST_READ_HEAD_PHASE)。達這一步的實現也是在nginx中。此外,還有一些其他資訊:IP,url,reachtime等。

三、注意事項

nginx寫flume採用socket+json的方式,主要考慮以下幾個問題:

1、效率。採用有效長連線,也就是在nginx啟動的時候就建立好socket連結,並且設定非阻塞模式。

2、異常處理。主要異常都來自於socket鏈路。連線失敗,傳送失敗等等。

3、記憶體管理。nginx有自己的一套管理系統,自己寫的模組要能跟上nginx的流程自我建立和銷燬。

4、url的獲取。完整的url需要自己拼湊。

5、如何插入id。這個要分析原始碼了。

6、配置檔案。日誌模組的啟動以及flume syslogtcp的ip port都需要在配置檔案中設定。

需求和技術方案都已經確定,接下來就是寫程式碼了。

四、程式碼

1、變數。

考慮到使用的變數比較多,封裝成結構體形式

typedef struct {
    ngx_int_t                       on;                   /**<是否開啟日誌記錄 on/off*/

    ngx_fd_t                        file_fd;              /**<檔案fd,使用者debug狀態下同步寫入檔案*/
    ngx_fd_t                        flume_fd;             /**<flume server fd,用於傳送資料*/

    ngx_uint_t                      local_port;           /**<本地主機port*/
    char                           *local_ip;             /**<本地主機ip*/

    ngx_uint_t                      header_key_hash;      /**<header_key的hash值*/
    char                           *header_key;           /**<值為:"id"*/
    size_t                          header_key_len;       /**<"id"的長度,2*/
    char                           *header_value;         /**<header value,實際上是36位uuid*/
    size_t                          header_value_len;     /**<長度,37*/

    ngx_url_t                       flume_url;	          /**<flume socket ip:port*/

    ngx_int_t                       server_addr_index;    /**<*/
    ngx_int_t                       server_port_index;    /**<*/
    ngx_int_t                       schema_index;         /**<*/
    ngx_int_t                       request_uri_index;    /**<*/

    ngx_http_variable_value_t      *schema_value;         /**<http/https 用於拼湊url,下同*/
    ngx_http_variable_value_t      *server_addr_value;    /**<>*/
    ngx_http_variable_value_t      *request_uri_value;    /**<>*/
    ngx_http_variable_value_t      *server_port_value;    /**<>*/

}ngx_http_record_log_main_conf_t;
配置檔案中,日誌功能的開啟和flume socket 設定分別在http的全域性範圍內,使用命令:record_request_log和flume_server。
static ngx_command_t  ngx_http_record_log_commands[] = {

    { ngx_string("record_request_log"),
      NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
      ngx_http_request_log,
      NGX_HTTP_MAIN_CONF_OFFSET,
      0,
      NULL },

    { ngx_string("flume_server"),
      NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
      ngx_http_flume_server,
      NGX_HTTP_MAIN_CONF_OFFSET,
      0,
      NULL },

      ngx_null_command
};



static ngx_http_module_t  ngx_http_record_log_module_ctx = {
    NULL,                                  /* preconfiguration */
    ngx_http_record_log_init,              /* postconfiguration */

    ngx_http_record_log_create_main_conf,  /* create main configuration */
    ngx_http_record_log_init_main_conf,    /* init main configuration */

    NULL,                                  /* create server configuration */
    NULL,                                  /* merge server configuration */

    NULL,       /* create location configuration */
    NULL         /* merge location configuration */
};


ngx_module_t  ngx_http_record_log_module = {
    NGX_MODULE_V1,
    &ngx_http_record_log_module_ctx,           /* module context */
    ngx_http_record_log_commands,              /* module directives */
    NGX_HTTP_MODULE,                       /* module type */
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    NULL,                                  /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    ngx_http_record_log_exit_process,      /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

回撥函式ngx_http_record_log_create_main_conf和ngx_http_record_log_init_main_conf用於完成結構體

ngx_http_record_log_main_conf_t的建立和初始化。其中,使用的變數都是在nginx的記憶體池中建立的,

這樣記憶體就交給nginx統一處理,不用擔心出現記憶體洩露或者記憶體碎片。

static void *
ngx_http_record_log_create_main_conf(ngx_conf_t *cf)
{
    ngx_http_record_log_main_conf_t  *rlmcf;

    rlmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_record_log_main_conf_t));
    if (rlmcf == NULL) {
        return NULL;
    }

    rlmcf->header_key = ngx_pcalloc(cf->pool,3);
    if(rlmcf->header_key ==  NULL)
    {
        return NULL;
    }

    rlmcf->local_ip = ngx_pcalloc(cf->pool,16);
    if(rlmcf->local_ip == NULL)
    {
        return NULL;
    }

    rlmcf->header_value = ngx_pcalloc(cf->pool,37);
    if(rlmcf->header_value == NULL)
    {
        return NULL;
    }

    return rlmcf;
}

static char *
ngx_http_record_log_init_main_conf(ngx_conf_t *cf, void *conf)
{

    ngx_http_record_log_main_conf_t  *rlmcf = conf;

    rlmcf->header_key_len = 2;

    strcpy(rlmcf->header_key,"id");

    rlmcf->header_key[rlmcf->header_key_len] = '\0';

    rlmcf->header_key_hash = ngx_hash(ngx_hash(0,'i'),'d');

    rlmcf->header_value_len = 36;

    return NGX_CONF_OK;
}

2、socket。

在nginx啟動讀取配置檔案的時候,建立socket連線,但是這時有可能flume尚未啟動,所以不能保證連結建立成功。如果不成功,在傳送日誌資訊的時候會再一次建立socket連線。

ngx_int_t
ngx_init_flume_log_fd(ngx_http_record_log_main_conf_t *rlmcf,ngx_log_t * log)
{
    //ngx_connection_t * c;
    //struct ifreq temp;
    ///定義sockfd
    rlmcf->flume_fd = ngx_socket(AF_INET,SOCK_STREAM, 0);

    //ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "socket %d", flume_fd);

    if (rlmcf->flume_fd == -1) {

        ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
                      ngx_socket_n "[RECORD_LOG]-(ngx_init_flume_log_fd): 建立flume socket失敗!!!\n");
        return NGX_ERROR;
    }

    ///連線伺服器,成功返回0,錯誤返回-1
    if (connect(rlmcf->flume_fd, rlmcf->flume_url.addrs->sockaddr, rlmcf->flume_url.addrs->socklen) < 0)
    {

        ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
                           "[RECORD_LOG]-(ngx_init_flume_log_fd):conncet flume server failed\n");

        if (ngx_close_socket(rlmcf->flume_fd) == -1) {
            ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
                          ngx_close_socket_n "[RECORD_LOG]-(ngx_init_flume_log_fd):failed to close flume fd\n");
        }
        rlmcf->flume_fd = -1;  ///如果不是設定-1,flume_fd仍然是個有效的fd,在寫日誌的時候會提示錯誤88(非socket操作),導致不能重新連線(因為設定是錯如9或者32才重新建立連線的)
        ///這裡只是借用返回值 NGX_AGAIN,其並非NGX_AGAIN的本意.
        ///只是在ngx_write_flumelog()中表明連線失敗,防止陷入死迴圈
        return NGX_AGAIN;
    }
    else
    {
        ///set nonblock
        ngx_nonblocking(rlmcf->flume_fd);
    }

    return NGX_OK;
}

當nginx結束退出時,需要close socket連線。
static void
ngx_http_record_log_exit_process(ngx_cycle_t *cycle)
{
    ngx_http_record_log_main_conf_t *rlmc;

    rlmc = ngx_http_cycle_get_module_main_conf(ngx_cycle,

                                                ngx_http_record_log_module);
    if (ngx_close_socket(rlmc->flume_fd) == -1) {
            ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
                          ngx_close_socket_n "[RECORD_LOG]-(ngx_http_record_log_exit_process): 關閉flume fd失敗 %V failed\n",strerror(errno));
    }
    else
        ngx_log_error(NGX_LOG_EMERG,cycle->log,0,"[RECORD_LOG]-(ngx_http_record_log_exit_process):關閉flume fd\n");
    #ifdef NGX_DEBUG


    if (ngx_close_socket(rlmc->file_fd) == -1) {
            ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
                          ngx_close_socket_n "[RECORD_LOG]-(ngx_http_record_log_exit_process): 關閉file fd失敗 %V failed",strerror(errno));
    }
    else
        ngx_log_error(NGX_LOG_EMERG,cycle->log,0,"[RECORD_LOG]-(ngx_http_record_log_exit_process):關閉 file fd\n");
    #endif
}

3、插入id。

ngx_int_t
ngx_insert_id_into_headers(ngx_http_record_log_main_conf_t *rlmcf,ngx_http_request_t *r)
{
    ngx_table_elt_t             *h;

    h = ngx_list_push(&r->headers_in.headers);
    if (h == NULL) {
        ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
            "[RECORD_LOG_TIME]-(ngx_insert_id_into_headers):失敗!!!在request headers中分配空間失敗,無法插入id\n");
        return NGX_ERROR;
    }


    h->hash = rlmcf->header_key_hash;

    r->header_hash = h->hash;

    h->key.len = rlmcf->header_key_len;
    h->key.data = (u_char*)rlmcf->header_key;
    h->key.data[h->key.len] = '\0';

    h->value.len = rlmcf->header_value_len;
    h->value.data = (u_char*)rlmcf->header_value;
    h->value.data[h->value.len] = '\0';

    h->lowcase_key = (u_char*)rlmcf->header_key;

    return NGX_OK;
}

4、設定http post_read 階段的處理handler

static ngx_int_t
ngx_http_record_log_init(ngx_conf_t *cf)
{
    ngx_http_handler_pt        *h;
    ngx_http_core_main_conf_t  *cmcf;

    cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);

    h = ngx_array_push(&cmcf->phases[NGX_HTTP_POST_READ_PHASE].handlers);
    if (h == NULL) {
        return NGX_ERROR;
    }

    *h = ngx_http_record_log_handler;

    return NGX_OK;
}

5、拼湊url

url=$schema+$server_addr+$request_uri
目前是採用這種方式,並且沒有加port,因為都是80埠的,也沒有加引數,目前用不到,等以後用到的時候可以再加。

...

tmp = ngx_snprintf(tmp,rlmcf->schema_value->len+9,",\"url\":\"%s:",rlmcf->schema_value->data);

    tmp = ngx_snprintf(tmp,rlmcf->server_addr_value->len+2,"//%s",rlmcf->server_addr_value->data);

    tmp = ngx_snprintf(tmp,rlmcf->request_uri_value->len, "%s",rlmcf->request_uri_value->data);


...

6、傳送日誌資訊

...

 do{
        len = send(rlmcf->flume_fd,msg,msg_len,0);

        if(len <= 0)
        {
            ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
                "[RECORD_LOG_TIME]-ngx_write_flumelog:失敗!!!傳送的到flume server的資料長度小於0,錯誤:%s\n",
                    strerror(errno));
            ///對方關閉了連線或者是無效的連線
            if(errno == 32 || errno == 9)
            {
                close(rlmcf->flume_fd);

                t_errno = errno;

                if(NGX_OK != ngx_init_flume_log_fd(rlmcf,r->connection->log))
                {
                    ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
                        "[RECORD_LOG_TIME]-ngx_write_flumelog:重新建立連線失敗!!,錯誤:%s\n",strerror(errno));

                    return NGX_ERROR;
                }

                ngx_log_error(NGX_LOG_EMERG,r->connection->log,ngx_errno,
                "[RECORD_LOG_TIME]-ngx_write_flumelog:重新建立連線!!\n");
            }
            else
                return NGX_ERROR;
        }
        else
        {
            break;
        }

    }while(t_errno == 9||t_errno == 32);

...

總結:

日誌模組的功能比較簡單,但是若想寫出清晰規範的程式碼,需要對nginx的處理流程和記憶體結構有一定的瞭解。筆者也是看了很久的原始碼才磕磕碰碰寫出來的。若有寫的不到位的地方還望指出,不勝感激。其實,考慮到效率問題,作為負載均衡來說還是能不加攔截模組就不加。