memcached原始碼閱讀----使用libevent和多執行緒模型
本篇文章主要是我今天閱讀memcached原始碼關於程序啟動,在網路這塊做了哪些事情。
一、libevent的使用
首先我們知道,memcached是使用了iblievet作為網路框架的,而iblievet又是單執行緒模型的基於linux下epoll事件的非同步模型。因此,其基本的思想就是 對可讀,可寫,超時,出錯等事件進行繫結函式,等有其事件發生,對其繫結函式回撥。
可以減掉了解一下 libevent基本api呼叫
struct event_base *base;
base = event_base_new();//初始化libevent
event_base_new對比epoll,可以理解為epoll裡的epoll_create。
event_base內部有一個迴圈,迴圈阻塞在epoll呼叫上,當有一個事件發生的時候,才會去處理這個事件。其中,這個事件是被繫結在event_base上面的,每一個事件就會對應一個struct event,可以是監聽的fd。
其中struct event 使用event_new 來建立和繫結,使用event_add來啟用,例如:
struct event *listener_event;
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
引數說明:
base:event_base型別,event_base_new的返回值
listener:監聽的fd,listen的fd
EV_READ|EV_PERSIST:事件的型別及屬性
do_accept:繫結的回撥函式
(void*)base:給回撥函式的引數
event_add(listener_event, NULL);
對比epoll:
event_new相當於epoll中的epoll_wait,其中的epoll裡的while迴圈,在libevent裡使用event_base_dispatch。
event_add相當於epoll中的epoll_ctl,引數是EPOLL_CTL_ADD,新增事件。
注:libevent支援的事件及屬性包括(使用bitfield實現,所以要用 | 來讓它們合體)
EV_TIMEOUT: 超時
EV_READ: 只要網路緩衝中還有資料,回撥函式就會被觸發
EV_WRITE: 只要塞給網路緩衝的資料被寫完,回撥函式就會被觸發
EV_SIGNAL: POSIX訊號量
EV_PERSIST: 不指定這個屬性的話,回撥函式被觸發後事件會被刪除
EV_ET: Edge-Trigger邊緣觸發,相當於EPOLL的ET模式
事件建立新增之後,就可以處理髮生的事件了,相當於epoll裡的epoll_wait,在libevent裡使用event_base_dispatch啟動event_base迴圈,直到不再有需要關注的事件。
有了上面的分析,結合之前做的epoll服務端程式,對於一個伺服器程式,流程基本是這樣的:
1. 建立socket,bind,listen,設定為非阻塞模式
2. 建立一個event_base,即
- struct event_base * event_base_new(void)
3. 建立一個event,將該socket託管給event_base,指定要監聽的事件型別,並繫結上相應的回撥函式(及需要給它的引數)。即
- struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
4. 啟用該事件,即
- int event_add(struct event *ev, conststruct timeval *tv)
5. 進入事件迴圈,即
- int event_base_dispatch(struct event_base *event_base)
有了上邊的基礎東西,可以進入memcached的閱讀了。 二、memcached原始碼分析 main函式啟動,首先會初始化很多資料,這裡我們只涉及大網路這塊,其他以後分析,先忽略。 1.首先初始化 主工作執行緒的的iblievet物件
/* initialize main thread libevent instance */
main_base = event_init();
最後會呼叫
/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}
在該物件內部迴圈。不退出。 2.初始化連線的物件
static void conn_init(void) {
freetotal = 200;
freecurr = 0;
if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
fprintf(stderr, "Failed to allocate connection structures\n");
}
return;
}
這裡是先預先分配200個conn*的記憶體。等有連線上來,會從freeconns 取。 如下程式碼:
/*
* Returns a connection from the freelist, if any.
*/
conn *conn_from_freelist() {
conn *c;
pthread_mutex_lock(&conn_lock);
if (freecurr > 0) {
c = freeconns[--freecurr];
} else {
c = NULL;
}
pthread_mutex_unlock(&conn_lock);
return c;
}
3.那麼conn的結構體內部長什麼樣子呢?
typedef struct conn conn;
struct conn {
int sfd;
sasl_conn_t *sasl_conn;
enum conn_states state;
enum bin_substates substate;
struct event event;
short ev_flags;
short which; /** which events were just triggered */
char *rbuf; /** buffer to read commands into */
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */
char *ritem; /** when we read in an item's value, it goes here */
int rlbytes;
/* data for the nread state */
/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
void *item; /* for commands set/add/replace */
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
char **suffixlist;
int suffixsize;
char **suffixcurr;
int suffixleft;
enum protocol protocol; /* which protocol this con<pre name="code" class="cpp"> if (sigignore(SIGPIPE) == -1) {
perror("failed to ignore SIGPIPE; sigaction");
exit(EX_OSERR);
}
nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr request_addr; /* Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */}; 這裡的所有欄位就是在處理資料需要用到的。這裡不詳細描述。以後會慢慢分解。 因為是memcached是多執行緒模型,因此在從freeconn取出一個物件的時候,是要加解鎖使用。 忽略SIGIPIE訊號,防止rst時的程式退出
if (sigignore(SIGPIPE) == -1) {
perror("failed to ignore SIGPIPE; sigaction");
exit(EX_OSERR);
}
初始化多執行緒模型,並且每個執行緒一個iblievent的事件模型就是呼叫event_init函式。
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);
內部實現不詳細。主要是呼叫pthread_create函式。 4、然後開始通過埠號啟動網路監聽事件 程式碼如下:
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
然後呼叫下面的函式:
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file)
因為,一個主機可能會有多個網絡卡,比如雙線機房,聯通或者電信,因此內部實現會出現以下程式碼:
for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/
if (errno == EMFILE) {
/* ...unless we're out of fds */
perror("server_socket");
exit(EX_OSERR);
}
continue;
}
而
static int new_socket(struct addrinfo *ai)
該函式就是呼叫socket函式,設定為非阻塞。 5、然後生成一個監聽的conn物件 程式碼如下
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
static conn *listen_conn = NULL;
作為全域性的靜態的變數。無頭結點的單鏈表
我們繼續深入conn_new 函式內部
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c = conn_from_freelist();
該函式主要是做了哪些動作呢? 第一,從剛才的free_cnn_list取出一個conn* 來,然互分配記憶體,根據相關配置資訊,進行相關的欄位初始化工作。 第二,加入到iblievent事件庫中
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
if (conn_add_to_freelist(c)) {
conn_free(c);
}
perror("event_add");
return NULL;
}
這一步就是,講sfd上的事件繫結event_handler 函式,就是當有該連線上來的時候有資料進行可讀的時候繫結,回撥。 7、狀態機的解讀
<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">最終event_handler函式會呼叫</span>
static void drive_machine(conn *c)
函式。那麼這個函式做了哪些工作呢?
當然是等待連線了,那就是accept函數了。
因此,入股市conn_listening狀態,
while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1)
當然同樣是 講sfd設定成非阻塞的。 這個時候是有資料上來了。 因此就要設定讀命令狀態了,呼叫以下函式:
<pre name="code" class="cpp">/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
通過註釋可以知道,該函式是講一個新連線分配各其他執行緒, 通過程式碼我們可以看出 首先,分配一個item塊,講連線的socket的fd 賦值給item,同時有當前狀態,標誌位,讀buff大小等,然後分配一個執行緒,講item推送到該thread的處理佇列裡了。 然互,通過往管道里寫入C字元,通知到管道的另一端,進行處理該操作符的事件。因此,完成了對對該連線的 分配工作。 那麼我接下來看一看 執行緒是如果處理的。 在初始化執行緒的時候,已經把管道的兩個操作符放入到了iblievent裡了。如下程式碼:
/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
綁定了回撥函式:
static void thread_libevent_process(int fd, short which, void *arg)
當讀到字元'c'的時候,就從其中佇列中取出一個item*,掉用一下函式
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base)
同樣,呼叫
conn *c = conn_from_freelist();
取出一個conn* ,然後進行初始化,這個時候和上文講到的一樣了,知識狀態不同了, 因此這裡使用了一個狀態機的模式了。 有如下狀態:
enum conn_states {
conn_listening, /**< the socket which listens for connections */
conn_new_cmd, /**< Prepare connection for next command */
conn_waiting, /**< waiting for a readable socket */
conn_read, /**< reading in a command line */
conn_parse_cmd, /**< try to parse a command from the input buffer */
conn_write, /**< writing out a simple response */
conn_nread, /**< reading in a fixed number of bytes */
conn_swallow, /**< swallowing unnecessary bytes w/o storing */
conn_closing, /**< closing this connection */
conn_mwrite, /**< writing out many items sequentially */
conn_max_state /**< Max state value (used for assertion) */
};
也就是
static void drive_machine(conn *c)
的核心邏輯了。通過設定狀態,然後呼叫不同的程式碼,
因此在一個狀態結束之後,總是會看大如下程式碼呼叫:
/*
* Sets a connection's current state in the state machine. Any special
* processing that needs to happen on certain state transitions can
* happen here.
*/
static void conn_set_state(conn *c, enum conn_states state) {
assert(c != NULL);
assert(state >= conn_listening && state < conn_max_state);
if (state != c->state) {
if (settings.verbose > 2) {
fprintf(stderr, "%d: going from %s to %s\n",
c->sfd, state_text(c->state),
state_text(state));
}
if (state == conn_write || state == conn_mwrite) {
MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
}
c->state = state;
}
}
到此,網路框架部分已經基本處理完成。起始這個框架是非常簡單而且實用的。 redis也是基本的思想模型,只不過是單執行緒的,而memcached是多執行緒的模型。在開發模式上可以有效的借鑑。 該文章為原創文章,更多文章,歡迎訪問 http://blog.csdn.net/wallwind