1. 程式人生 > >libevent和基於libevent的網路程式設計

libevent和基於libevent的網路程式設計



1 libevent介紹和安裝

介紹

libevent是一個輕量級的基於事件驅動的高效能的開源網路庫,並且支援多個平臺,對多個平臺的I/O複用技術進行了封裝,當我們編譯庫的程式碼時,編譯的指令碼將會根據OS支援的處理事件機制,來編譯相應的程式碼,從而在libevent介面上保持一致。

在當前的伺服器上,面對的主要問題就是要能處理大量的連線。而通過libevent這個網路庫,我們就可以呼叫它的API來很好的解決上面的問題。首先,可以來回顧一下,對這個問題的傳統解決方法。

問題: 如何處理多個客戶端連線

解決方案1:I/O複用技術

這幾種方式都是同步I/O,即當讀寫事件就緒,他們自己需要負責進行讀寫,這個讀寫過程是阻塞的,而非同步I/O則不需要自己負責讀寫,只需要通知負責讀寫的程式就可以了。

  • 迴圈
    假設當前我伺服器有多個網路連線需要看管,那麼我就迴圈遍歷開啟的網路連線的列表,來判斷是否有要讀取的資料。這種方法的缺點很明顯,那就是 1.速度緩慢(必須遍歷所有的網路連線) 2.效率低 (處理一個連線時可能發生阻塞,妨礙其他網路連線的檢查和處理)

  • select方式
    select對應於核心中的sys_select呼叫,sys_select首先將第二三四個引數指向的fd_set拷貝到核心,然後對每個被SET的描述符呼叫進行poll,並記錄在臨時結果中(fdset),如果有事件發生,select會將臨時結果寫到使用者空間並返回;當輪詢一遍後沒有任何事件發生時,如果指定了超時時間,則select會睡眠到超時,睡眠結束後再進行一次輪詢,並將臨時結果寫到使用者空間,然後返回。
    select返回後,需要逐一檢查關注的描述符是否被SET(事件是否發生)。(select支援的檔案描述符數量太小了,預設是1024)。

  • poll方式
    poll與select不同,通過一個pollfd陣列向核心傳遞需要關注的事件,故沒有描述符個數的限制,pollfd中的events欄位和revents分別用於標示關注的事件和發生的事件,故pollfd陣列只需要被初始化一次。
    poll的實現機制與select類似,其對應核心中的sys_poll,只不過poll向核心傳遞pollfd陣列,然後對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。
    poll返回後,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。

  • epoll方式
    epoll通過epoll_create建立一個用於epoll輪詢的描述符,通過epoll_ctl新增/修改/刪除事件,通過epoll_wait檢查事件,epoll_wait的第二個引數用於存放結果。
    epoll與select、poll不同,首先,其不用每次呼叫都向核心拷貝事件描述資訊,在第一次呼叫後,事件資訊就會與對應的epoll描述符關聯起來。其次,epoll不是通過輪詢,而是通過在等待的描述符上註冊回撥函式,當事件發生時,回撥函式負責把發生的事件儲存在就緒事件連結串列中,最後寫到使用者空間。
    epoll返回後,該引數指向的緩衝區中即為發生的事件,對緩衝區中每個元素進行處理即可,而不需要像poll、select那樣進行輪詢檢查。

解決方案2:多執行緒技術或多程序技術

多執行緒技術和多程序技術也可以處理高併發的資料連線,因為在伺服器中可以產生大量的程序和執行緒和處理我們需要監視的連線。但是,這兩種方式也是有很大的侷限性的,比如多程序模型就不適合大量的短連線,因為程序的產生和關閉需要消耗較大的系統性能,同樣,還要程序程序間的通訊,在CPU效能不足的情況下不太適合。而多執行緒技術則不太適合處理長連線,因為當我們建立一個程序時,linux中會消耗8G的棧空間,如果我們的每個連線都杵著不斷開,那麼大量連線長連線後,導致的結果就是記憶體的大量消耗。

解決方案3:常用的上述二者複合使用
上述的兩種方法各具有優缺點,因此,我們可以將上述的方法結合起來,這也是目前使用較多的處理高併發的方法。多程序+I/O複用或者多執行緒+I/O複用。而在具體的實現上,又可以分為很多的方式。比如多執行緒+I/O複用技術,我們使用使用一個主執行緒負責監聽一個埠和接受的描述符是否有讀寫事件產生,如果有,則將事件分發給其他的工作程序去完成,這也是程序池的理念。

在說完上述的高併發的處理方法之後,我們可以來介紹一個libevent的主要特色了。

同樣,lievent也是採用的上述系統提供的select,poll和epoll方法來進行I/O複用,但是針對於多個系統平臺上的不同的I/O複用實現方式,libevent進行了重新的封裝,並提供了統一的API介面。libevent在實現上使用了事件驅動這種機制,其本質上是一種Reactor模式。

Reactor模式,是一種事件驅動機制。應用程式需要提供相應的介面並註冊到Reactor上,如果相應的事件發生,Reactor將主動呼叫應用程式註冊的介面,這些介面又稱為“回撥函式”。

在Libevent中也是一樣,向Libevent框架註冊相應的事件和回撥函式;當這些事件發生時,Libevent會呼叫這些回撥函式處理相應的事件。

lbevent的事件支援三種,分別是網路IO、定時器和訊號。定時器的資料結構使用最小堆(Min Heap),以提高效率。網路IO和訊號的資料結構採用了雙向連結串列(TAILQ)。

安裝

libevent的安裝很簡單,我是直接從github上clone下一個原始碼,然後進行編譯安裝的。

具體的命令是(假設你已經安裝了git):

  # git clone https://github.com/nmathewson/Libevent.git
  # cd Libevent
  # sh autogen.sh
  # ./configure && make
  # make install
  # make verify  //驗證安裝

2 Linux下libevent主要API介紹

現在的libevent版本已經到達libevent2了,其增加了多執行緒的支援,API函式也發生了一些微小的變化。

  • 建立事件集

    struct event_base *event_base_new(void)

  • 建立事件

    struct event event_new(struct event_base ,evutil_socket_t ,short ,event_callback_fn,void*)

    引數一:事件所在的事件集。
    引數二:socket的描述符。
    引數三:事件型別,其中EV_READ表示等待讀事件發生,EV_WRITE表示寫事件發生,或者它倆的組合,EV_SIGNAL表示需要等待事件的號碼,如 果不包含上述的標誌,就是超時事件或者手動啟用的事件。
    引數四:事件發生時需要呼叫的回撥函式。
    引數五:回撥函式的引數值。

  • 新增事件和刪除事件

    int event_add(struct event * ev,const struct timeval* timeout)

    引數一:需要新增的事件
    引數二:事件的最大等待事件,如果是NULL的話,就是永久等待

    int event_del(struct event *)
    引數一:需要刪除的事件

  • 分配監聽事件

    int event_base_dispatch(struct event_base * )

    引數一:需要監視的事件集

  • I/O buffer事件

    struct bufferevent* bufferevent_socket_new
    (struct event_base * base,evutil_socket_t fd,int options)

    引數一:需要新增到的時間集
    引數二:相關的檔案描述符
    引數三:0或者是相應的BEV_OPT_*可選標誌

    int bufferevent_enable(struct bufferevent * bev,short event)

    引數一:需要啟用的bufferevent
    引數二:any combination of EV|READ | EV_WRITE

    int bufferevent_disable(struct bufferevent * bev,short event)

    引數說明:同上

    size_t bufferevent_read(struct bufferevent bev,void data,size_t size)

    引數一:讀取的buffer_event事件
    引數二:儲存資料的指標
    引數三:資料buffer的大小

    返回值:讀取資料的位元組數

    int bufferevent_write(struct bufferevent bev,const void data,size_t size)

    引數一:讀取的buffer_event事件
    引數二:儲存資料的指標
    引數三:要寫入的資料的大小,位元組數

如果你想知道更多的API使用情況,請點選這裡

3.1 程式設計例項之聊天室伺服器

下面,就基於libevent2編寫一個聊天室伺服器。

設計思想:首先建立一個套接字,進而建立一個事件對此埠進行監聽,將所請求的使用者組成一個佇列,並監聽所有的使用者事件,當某個使用者說話了,產生了讀事件,就將該使用者的發言傳送給佇列中的其他使用者。

程式分析

需要包含的libevent函式頭:

#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>

建立一個client結構體,接受連線後存放資料:

struct client {
/* The clients socket. */
    int fd;

/* The bufferedevent for this client. */
struct bufferevent *buf_ev;
    struct bufferevent *buf_ev;
/*
 * This holds the pointers to the next and previous entries in
 * the tail queue.
 */
    TAILQ_ENTRY(client) entries;
};

先來看下mian函式的處理:

int
main(int argc, char **argv)
{
    int listen_fd;
    struct sockaddr_in listen_addr;
    struct event ev_accept;
    int reuseaddr_on;

    /* Initialize libevent. */
    evbase = event_base_new();

    /* Initialize the tailq. */
    TAILQ_INIT(&client_tailq_head);

    /* Create our listening socket. */
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd < 0)
        err(1, "listen failed");
    memset(&listen_addr, 0, sizeof(listen_addr));
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = INADDR_ANY;
    listen_addr.sin_port = htons(SERVER_PORT);
    
    if (bind(listen_fd, (struct sockaddr *)&listen_addr,
    sizeof(listen_addr)) < 0)
    err(1, "bind failed");
    if (listen(listen_fd, 5) < 0)
    err(1, "listen failed");
    reuseaddr_on = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, 
    sizeof(reuseaddr_on));

    /* Set the socket to non-blocking, this is essential in event
     * based programming with libevent. */
    if (setnonblock(listen_fd) < 0)
    err(1, "failed to set server socket to non-blocking");

    /* We now have a listening socket, we create a read event to
    * be notified when a client connects. */
    event_assign(&ev_accept, evbase, listen_fd, EV_READ|EV_PERSIST, 
    on_accept, NULL);
    event_add(&ev_accept, NULL);

    /* Start the event loop. */
    event_base_dispatch(evbase);

    return 0;
}

首先,函式初始化了一個使用者佇列tailq,接著建立了一個socket套接字,並將套接字設定為非阻塞模式,接著對一個全域性的evbase事件集合,註冊了事件,事件源是listen_fd,回撥函式是on_accept,事件發生的情況是EV_READ,而且標誌EV_PESIST表明該事件一直存在,而後開啟事件掃描迴圈event_base_dispatch(evbase)

再看一下回調函式on_accpet實現:

void
on_accept(int fd, short ev, void *arg)
{
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    struct client *client;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (setnonblock(client_fd) < 0)
        warn("failed to set client socket non-blocking");

    /* We've accepted a new client, create a client object. */
    client = calloc(1, sizeof(*client));
    if (client == NULL)
        err(1, "malloc failed");
    client->fd = client_fd;

    client->buf_ev = bufferevent_socket_new(evbase, client_fd, 0);
    bufferevent_setcb(client->buf_ev, buffered_on_read, NULL,
        buffered_on_error, client);

    /* We have to enable it before our callbacks will be
    * called. */
    bufferevent_enable(client->buf_ev, EV_READ);

    /* Add the new client to the tailq. */
    TAILQ_INSERT_TAIL(&client_tailq_head, client, entries);

    printf("Accepted connection from %s\n", 
        inet_ntoa(client_addr.sin_addr));
}

這個回撥函式的作用很顯然,就是接受了一個客戶端的請求,並申請好了一個client資訊,將需要的內容填寫好,在填寫中需要注意的是,又向上述的事件集evbase中註冊了一個bufferevent事件client->buf_ev,並註冊了回撥函式buffered_on_read,buffered_on_error,這三個函式分別是當接受後的連線發生了讀或者錯誤事件後的執行函式。接著,將使用者的client結構放入了使用者的佇列tailq中去。

使用者的buffer可讀後的執行函式:

void
buffered_on_read(struct bufferevent *bev, void *arg)
{
    struct client *this_client = arg;
    struct client *client;
    uint8_t data[8192];
    size_t n;

    /* Read 8k at a time and send it to all connected clients. */
    for (;;) {
        n = bufferevent_read(bev, data, sizeof(data));
        if (n <= 0) {
            /* Done. */
            break;
        }
    
        /* Send data to all connected clients except for the
         * client that sent the data. */

        TAILQ_FOREACH(client, &client_tailq_head, entries) {
            if (client != this_client) {
                bufferevent_write(client->buf_ev, data,  n);
            }
        }

    }

}

執行函式的作用很明顯,將libevent管理中的buffer資料讀取出,存入本地的data陣列內,然後對佇列中的client進行檢索,如果不是發資料的client,則將資料寫入該client的buffer中,傳送給該使用者。這裡注意的是需要反覆讀取buffer中的資料,防止一個讀取並沒有讀取乾淨,直到讀取不到資料為止。

buffer出錯處理函式和上述函式差不多,功能就是出錯後,結束掉儲存的client結構,詳細就不說了。

編譯的時候記得修改Makefile中Libevent資料夾的位置

3.2 程式設計例項之回顯伺服器(純非同步IO)

設計思想:所謂回顯伺服器就是將客戶端發過來的資料再發回去,這裡主要也就是說明libevent的純IO複用實現。實現方法和上面的差不多,甚至可以說更加簡單。

程式和上面的聊天伺服器差不多,只是在buffer可讀的事件函式中,不是將使用者的資料傳送給其他使用者,而是直接傳送給使用者本身。

3.3 程式設計例項之回顯伺服器(多執行緒--per connection per thread)

設計思想:上面的方法單純使用libevent的簡單函式來實現服務,但是這裡,我們假設我們需要處理的客戶端很少,於是我們可以使用對於每個連線我們分配一個執行緒這樣的方式來實現對使用者的服務。這種方式簡單有效,一對一服務,就算業務邏輯出現阻塞也不怕。

程式分析

首先定義了一些資料結構,worker資料結構定義的是一個工作者,它包含有一個工作執行緒,和結束標誌,需要獲取的工作佇列,和建立連結串列需要的指標。job資料結構定義的是操作一個job的方法和物件,這回到程式中,實際上就是指的是事件發生後,封裝好的client結構體和處理這個結構體的方法。workqueue資料結構指的是當前的工作佇列中的工作者,以及工作佇列中的待完成的工作,以及互斥鎖和條件變數(因為多個工作程序需要訪問這些資源)。

具體的流程就是,用一個主執行緒監聽一個套接字,並將套接字接受到的連線accept,並建立一個client資料結構儲存該連線的資訊,在這個client結構中註冊一個bufferevent事件,註冊到client->evbase上(這時候這是向client中的evbase註冊了一個事件還沒有進行迴圈這個事件集)。

接著,當監聽到某個client有bufferevent事件發生,主執行緒就把該client結構體和需要進行的工作方法包裝成一個job結構,然後把這個job扔到workqueue上去,並通知各個工作者。而後,各個工作者開著的執行緒就被激活了,瘋狂地去workqueue上去搶工作做,某個worker拿到工作後,就可以解包job,根據job的工作說明書(job_function)操作工作物件(client)了。這裡,job的工作說明有是迴圈client中的client->evbase,於是這樣執行緒就會一直去監視這個連線的狀態,如果有資料就這會呼叫回撥函式進行處理。同時,這個執行緒也就是阻塞在這裡,這對這一個連線負責。

建立workqueue需要的結構體和函式有:

typedef struct worker {
    pthread_t thread;
    int terminate;
    struct workqueue *workqueue;
    struct worker *prev;
    struct worker *next;
} worker_t;

typedef struct job {
    void (*job_function)(struct job *job);
    void *user_data;
    struct job *prev;
    struct job *next;
} job_t;

typedef struct workqueue {
    struct worker *workers;
    struct job *waiting_jobs;
    pthread_mutex_t jobs_mutex;
    pthread_cond_t jobs_cond;
} workqueue_t;

int workqueue_init(workqueue_t *workqueue, int numWorkers);

void workqueue_shutdown(workqueue_t *workqueue);

void workqueue_add_job(workqueue_t *workqueue, job_t *job);

主執行緒的on_accept函式為:

void on_accept(evutil_socket_t fd, short ev, void *arg) {
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    workqueue_t *workqueue = (workqueue_t *)arg;
    client_t *client;
    job_t *job;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (evutil_make_socket_nonblocking(client_fd) < 0) 
    {
        warn("failed to set client socket to non-blocking");
        close(client_fd);
        return;
    }

    /* Create a client object. */
    if ((client = malloc(sizeof(*client))) == NULL) 
    {
        warn("failed to allocate memory for client state");
        close(client_fd);
        return;
    }
    memset(client, 0, sizeof(*client));
    client->fd = client_fd;

    /* Add any custom code anywhere from here to the end of this function
    * to initialize your application-specific attributes in the client struct.
    */

    if ((client->output_buffer = evbuffer_new()) == NULL) 
    {
        warn("client output buffer allocation failed");
        closeAndFreeClient(client);
        return;
    }

    if ((client->evbase = event_base_new()) == NULL) 
    {
        warn("client event_base creation failed");
        closeAndFreeClient(client);
        return;
    }

    client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, BEV_OPT_CLOSE_ON_FREE);
    if ((client->buf_ev) == NULL) {
        warn("client bufferevent creation failed");
        closeAndFreeClient(client);
        return;
    }
    bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
                  buffered_on_error, client);

    /* We have to enable it before our callbacks will be
     * called. */
    bufferevent_enable(client->buf_ev, EV_READ);

    /* Create a job object and add it to the work queue. */
    if ((job = malloc(sizeof(*job))) == NULL) {
        warn("failed to allocate memory for job state");
        closeAndFreeClient(client);
        return;
    }
    job->job_function = server_job_function;
    job->user_data = client;

    workqueue_add_job(workqueue, job);
}

job中的工作指南為:

static void server_job_function(struct job *job) {
    client_t *client = (client_t *)job->user_data;
    //do my job
    event_base_dispatch(client->evbase);
    
    closeAndFreeClient(client);
    free(job);
}

3.4 程式設計例項之回顯伺服器(多執行緒--執行緒池+非同步IO)

設計思想:假設我們的使用者很多,高併發,長連線,那麼我們還是來用I/O複用和執行緒池實現吧,用一個控制執行緒通過I/O複用負責監聽和分發事件,用一組執行緒池來進行處理事件,這樣就可以靈活地將控制邏輯和業務邏輯分開了,見下述講解。

程式分析
具體的流程和上面的差不多,用一個主執行緒監聽一個套接字,並將套接字接受到的連線accept,並建立一個client資料結構儲存該連線的資訊,在這個client結構中註冊一個bufferevent事件,但是這裡,將事件註冊到accept_evbase中,仍然用主執行緒進行監聽。

而面對監聽後出現的事件,將client和操作client的方法打包成一個job,放到上述的workqueue中去,讓工作程序來完成。這樣的操作和上述的差別在於上述方法將bufferevent註冊到client中的evbase中,用工作執行緒監聽,而本方法用主執行緒監聽,工作執行緒負責處理監聽產生的事件。

這要的差別在於兩個函式 on_accept函式:

void on_accept(evutil_socket_t fd, short ev, void *arg) {
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    client_t *client;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (evutil_make_socket_nonblocking(client_fd) < 0) {
        warn("failed to set client socket to non-blocking");
        close(client_fd);
        return;
    }

    /* Create a client object. */
    if ((client = malloc(sizeof(*client))) == NULL) {
        warn("failed to allocate memory for client state");
        close(client_fd);
        return;
    }
    memset(client, 0, sizeof(*client));
    client->fd = client_fd;

    /* Add any custom code anywhere from here to the end of this function
     * to initialize your application-specific attributes in the client struct.
    */

    if ((client->output_buffer = evbuffer_new()) == NULL) {
        warn("client output buffer allocation failed");
        closeAndFreeClient(client);
        return;
    }
    //需要注意的是,這裡註冊到evbase_accept
    client->buf_ev = bufferevent_socket_new(evbase_accept, client_fd,BEV_OPT_CLOSE_ON_FREE);
    if ((client->buf_ev) == NULL) {
        warn("client bufferevent creation failed");
        closeAndFreeClient(client);
        return;
    }
    bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
                  buffered_on_error, client);

    /* We have to enable it before our callbacks will be
    * called. */
    bufferevent_enable(client->buf_ev, EV_READ);
}

在buffered_on_read中,提交job。

void buffered_on_read(struct bufferevent *bev, void *arg) 
{
    client_t *client = (client_t *)arg;
    job_t *job;

    /* Create a job object and add it to the work queue. */
    if ((job = malloc(sizeof(*job))) == NULL) {
        warn("failed to allocate memory for job state");
        closeAndFreeClient(client);
        return;
    }
    job->job_function = server_job_function;
    job->user_data = client;

    workqueue_add_job(&workqueue, job);
}

在job工作指南server_job_function中就可以做你工作該做的事兒了,根據發來的資訊進行資料庫處理,http返回等等。

4 參考文章