處理大併發之 libevent demo詳細分析(對比epoll)
libevent預設情況下是單執行緒,每個執行緒有且僅有一個event_base,對應一個struct event_base結構體,以及賦予其上的事件管理器,用來安排託管給它的一系列的事件。
當有一個事件發生的時候,event_base會在合適的時間去呼叫繫結在這個事件上的函式,直到這個函式執行完成,然後在返回安排其他事件。需要注意的是:合適的時間並不是立即。
例如:
- struct event_base *base;
- base = event_base_new();//初始化libevent
event_base_new對比epoll,可以理解為epoll裡的epoll_create。
event_base內部有一個迴圈,迴圈阻塞在epoll呼叫上,當有一個事件發生的時候,才會去處理這個事件。其中,這個事件是被繫結在event_base上面的,每一個事件就會對應一個struct event,可以是監聽的fd。
其中struct event 使用event_new 來建立和繫結,使用event_add來啟用,例如:
- struct event *listener_event;
-
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
引數說明:
base:event_base型別,event_base_new的返回值
listener:監聽的fd,listen的fd
EV_READ|EV_PERSIST:事件的型別及屬性
do_accept:繫結的回撥函式
(void*)base:給回撥函式的引數
event_add(listener_event, NULL);
對比epoll:
event_new相當於epoll中的epoll_wait,其中的epoll裡的while迴圈,在libevent裡使用event_base_dispatch。
event_add相當於epoll中的epoll_ctl,引數是EPOLL_CTL_ADD,新增事件。
注:libevent支援的事件及屬性包括(使用bitfield實現,所以要用 | 來讓它們合體)
EV_TIMEOUT: 超時
EV_READ: 只要網路緩衝中還有資料,回撥函式就會被觸發
EV_WRITE: 只要塞給網路緩衝的資料被寫完,回撥函式就會被觸發
EV_SIGNAL: POSIX訊號量
EV_PERSIST: 不指定這個屬性的話,回撥函式被觸發後事件會被刪除
EV_ET: Edge-Trigger邊緣觸發,相當於EPOLL的ET模式
事件建立新增之後,就可以處理髮生的事件了,相當於epoll裡的epoll_wait,在libevent裡使用event_base_dispatch啟動event_base迴圈,直到不再有需要關注的事件。
有了上面的分析,結合之前做的epoll服務端程式,對於一個伺服器程式,流程基本是這樣的:
1. 建立socket,bind,listen,設定為非阻塞模式
2. 建立一個event_base,即
- struct event_base * event_base_new(void)
3. 建立一個event,將該socket託管給event_base,指定要監聽的事件型別,並繫結上相應的回撥函式(及需要給它的引數)。即
- struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
4. 啟用該事件,即
- int event_add(struct event *ev, conststruct timeval *tv)
5. 進入事件迴圈,即
- int event_base_dispatch(struct event_base *event_base)
首先來翻譯下例子上面的一段話:
對於select函式來說,不同的作業系統有不同的代替函式,它包括:poll,epoll,kqueue,evport和/dev/poll。這些函式的效能都比select要好,其中epoll在IO中新增,刪除,通知socket準備好方面效能複雜度為O(1)。
不幸的是,沒有一個有效的介面是一個普遍存在的標準,linux下有epoll,BSDS有kqueue,Solaris 有evport和/dev/poll,等等。沒有任何一個作業系統有它們中所有的,所以如果你想做一個輕便的高效能的非同步應用程式,你就需要把這些介面抽象的封裝起來,並且無論哪一個系統使用它都是最高效的。
這對於你來說就是最低階的libevent API,它提供了統一的介面取代了select,當它在計算機上執行的時候,使用了最有效的版本。
這裡是ROT13伺服器的另外一個版本,這次,他使用了libevent代替了select。這意味著我們不再使用fd_sets,取而代之的使用event_base新增和刪除事件,它可能在select,poll,epoll,kqueue等中執行。
程式碼分析:
這是一個服務端的程式,可以處理客戶端大併發的連線,當收到客戶端的連線後,將收到的資料做了一個變換,如果是 ’a’-‘m’之間的字元,將其增加13,如果是 ’n’-‘z’之間的字元,將其減少13,其他字元不變,然後將轉換後的資料傳送給客戶端。
例如:客戶端傳送:Client 0 send Message!
服務端會回覆:Pyvrag 0 fraq Zrffntr!
在這個程式碼中沒有使用bufferevent這個強大的東西,在一個結構體中自己管理了一個緩衝區。結構體為:
- struct fd_state {
- char buffer[MAX_LINE];//緩衝區的大小
- size_t buffer_used;//接收到已經使用的buffer大小,每次將接收到的資料位元組數相加,當傳送的位元組數累計相加和buffer_used都相等時候,將它們都置為1
- size_t n_written;//傳送的累加位元組數
- size_t write_upto;//相當於一個臨時變數,當遇到換行符的時,將其收到的位元組數(換行符除外)賦給該值,當檢測到寫事件的時候,用已經發送的位元組數和該數值做比較,若收到的位元組總數小於該值,則傳送資料,等於該值,將結構體中3個位元組數統計變數都置為1,為什麼會置為1呢,因為有一個換行符吧。
- struct event *read_event;
- struct event *write_event;
- };
程式碼中自己管理了一個緩衝區,用於存放接收到的資料,傳送的資料將其轉換後也放入該緩衝區中,程式碼晦澀難懂,我也是經過打日誌分析後,才明白點,這個緩衝區自己還得控制好。但是libevent 2已經提供了一個神器bufferevent,我們在使用的過程中最好不要自己管理這個緩衝區,之所以分析這個程式碼,是為了熟悉libevent 做服務端程式的流程及原理。
下面是程式碼,加有部分註釋和日誌:
程式碼:lowlevel_libevent_server.c
- //說明,為了使我們的程式碼相容win32網路API,我們使用evutil_socket_t代替int,使用evutil_make_socket_nonblocking代替fcntl
- /* For sockaddr_in */
- #include <netinet/in.h>
- /* For socket functions */
- #include <sys/socket.h>
- /* For fcntl */
- #include <fcntl.h>
- #include <event2/event.h>
- #include <assert.h>
- #include <unistd.h>
- #include <string.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <errno.h>
- #define MAX_LINE 80
- void do_read(evutil_socket_t fd, short events, void *arg);
- void do_write(evutil_socket_t fd, short events, void *arg);
- char rot13_char(char c)
- {
- /* We don't want to use isalpha here; setting the locale would change
- * which characters are considered alphabetical. */
- if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
- return c + 13;
- elseif ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
- return c - 13;
- else
- return c;
- }
- struct fd_state {
- char buffer[MAX_LINE];
- size_t buffer_used;
- size_t n_written;
- size_t write_upto;
- struct event *read_event;
- struct event *write_event;
- };
- struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd)
- {
- struct fd_state *state = malloc(sizeof(struct fd_state));
- if (!state)
- return NULL;
- state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
- if (!state->read_event)
- {
- free(state);
- return NULL;
- }
- state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
- if (!state->write_event)
- {
- event_free(state->read_event);
- free(state);
- return NULL;
- }
- state->buffer_used = state->n_written = state->write_upto = 0;
- assert(state->write_event);
- return state;
- }
- void fr