Memcached原始碼分析-網路模型(1)
1 網路模型
Memcached採用了,單程序多執行緒的工作方式,同時採用了libevent事件驅動進行網路請求的處理。
2 工作原理
2.1 libevent介紹
2.2 網路請求流程
2.2.1 流程圖
2.2.2 主執行緒工作流程分析
主執行緒工作流程:
1 主執行緒主要在memcached.c的main()函式中。
2 socket_sdf = new_socket:建立一個新的socket。
3 setsockopt:配置socket資訊。
4 bind:繫結ip地址資訊。
5 listen:監聽埠。
6 向核心註冊事件
事件監聽物件:socket_sdf
事件監聽事件:read
事件處理函式:event_handle
7 阻塞等待核心事件觸發。這裡主要使用的是libevent庫中,網路I/O模型中的epoll模型,主要是呼叫epoll_wait()等待作業系統返回準備好的事件。每當socket_sdf 上面有read動作的時候,就會回撥event_handle。
8 事件準備好了之後,回撥函式event_handle工作流程:
呼叫drive_machine;接收連線請求con_accept_sfd = accept(;呼叫dispatch_conn_new分發請求
9 dispatch_conn_new工作流程:
a) 選擇工作執行緒:tid = (上次分配執行緒id + 1) % 工作執行緒數量
b)將連線請求放到工作執行緒的連線佇列中
c) 向工作執行緒監聽的管道讀通道中寫入c,驅動工作執行緒事件觸發
10 進入事件迴圈。
2.2.3 工作執行緒工作流程分析
工作執行緒工作流程圖分析:
1 初始化自己監聽的管道pipe(fds)
2 初始化自己的連線佇列new_conn_queue
3 向核心註冊事件
事件監聽物件:pipe_receive
事件監聽事件:read
事件處理函式:thread_libevent_process
4 阻塞等待核心事件的發生
5 如果是管道事件觸發
a) 呼叫thread_libevent_process回撥處理函式:從佇列中取出一個連線請求cq_pop。呼叫conn_new函式
b)向核心註冊監聽事件,conn_accept_sfd是主執行緒分配的連線請求。 監聽物件:conn_accept_sfd。 監聽事件:read;監聽回撥函式:event_handler
6 如果是連線請求事件觸發
a)呼叫event_handler回撥處理函式:呼叫drive_machine
b) drive_machine:狀態機處理:read()資料; 解析命令
; 根據命令處理請求; write()資料; close()請求
7 進入事件迴圈。
3 原始碼分析
3.1 原始碼地址
3.2 主執行緒原始碼分析
1 memcached.c的main函式中。主要通過server_socket建立一個socket。
//函式入口
int main (int argc, char **argv)
{/*{{{*/
/* create the listening socket, bind it, and init */
if (settings.socketpath == NULL) {
const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME" );
char temp_portnumber_filename[PATH_MAX];
FILE *portnumber_file = NULL;
if (portnumber_filename != NULL) {
snprintf(temp_portnumber_filename,
sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);
portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL) {
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}
errno = 0;
//建立tcp socket
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
//建立udp socket
errno = 0;
if (settings.udpport && server_sockets(settings.udpport, udp_transport,
portnumber_file)) {
vperror("failed to listen on UDP port %d", settings.udpport);
exit(EX_OSERR);
}
}
/* enter the event loop 主執行緒事件驅動迴圈 */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}
}/*}}}*/
2 server_socket主要是建立socket,bind,listen
static int server_socket(const char *interface,
int port,
enum network_transport transport,
for (next= ai; next; next= next->ai_next)
{/*{{{*/
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/
if (errno == EMFILE) {
/* ...unless we're out of fds */
perror("server_socket");
exit(EX_OSERR);
}
continue;
}
#ifdef IPV6_V6ONLY
if (next->ai_family == AF_INET6) {
error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
if (error != 0) {
perror("setsockopt");
close(sfd);
continue;
}
}
#endif
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
if (IS_UDP(transport)) {
maximize_sndbuf(sfd);
} else {
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
if (error != 0)
perror("setsockopt");
error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
}
//bind
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
if (errno != EADDRINUSE) {
perror("bind()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
close(sfd);
continue;
} else {
success++;
//監聽list
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
if (portnumber_file != NULL &&
(next->ai_addr->sa_family == AF_INET ||
next->ai_addr->sa_family == AF_INET6)) {
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
if (next->ai_addr->sa_family == AF_INET) {
fprintf(portnumber_file, "%s INET: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in.sin_port));
} else {
fprintf(portnumber_file, "%s INET6: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in6.sin6_port));
}
}
}
}
if (IS_UDP(transport))
{/*{{{*/
int c;
for (c = 0; c < settings.num_threads_per_udp; c++) {
int per_thread_fd = c ? dup(sfd) : sfd;
dispatch_conn_new(per_thread_fd, conn_read,
EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
}
else
{
//新建立連線事件
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}/*}}}*/
freeaddrinfo(ai);
/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}
3 conn_new主要是建立建立一個連線結構體conn 。並且註冊監聽事件。
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;
assert(sfd >= 0 && sfd < max_fds);
c = conns[sfd];
if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
fprintf(stderr, "Failed to allocate connection object\n");
return NULL;
}
MEMCACHED_CONN_CREATE(c);
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->suffixlist = 0;
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
c->suffixsize = SUFFIX_LIST_INITIAL;
c->iovsize = IOV_LIST_INITIAL;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
c->rbuf = (char *)malloc((size_t)c->rsize);
c->wbuf = (char *)malloc((size_t)c->wsize);
c->ilist = (item **)malloc(sizeof(item *) * c->isize);
c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
c->msglist == 0 || c->suffixlist == 0) {
conn_free(c);
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
fprintf(stderr, "Failed to allocate buffers for connection\n");
return NULL;
}
STATS_LOCK();
stats.conn_structs++;
STATS_UNLOCK();
c->sfd = sfd;
conns[sfd] = c;
}
c->transport = transport;
c->protocol = settings.binding_protocol;
/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
if (!settings.socketpath) {
c->request_addr_size = sizeof(c->request_addr);
} else {
c->request_addr_size = 0;
}
if (transport == tcp_transport && init_state == conn_new_cmd) {
if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
&c->request_addr_size)) {
perror("getpeername");
memset(&c->request_addr, 0, sizeof(c->request_addr));
}
}
if (settings.verbose > 1) {
if (init_state == conn_listening) {
fprintf(stderr, "<%d server listening (%s)\n", sfd,
prot_text(c->protocol));
} else if (IS_UDP(transport)) {
fprintf(stderr, "<%d server listening (udp)\n", sfd);
} else if (c->protocol == negotiating_prot) {
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
} else if (c->protocol == ascii_prot) {
fprintf(stderr, "<%d new ascii client connection.\n", sfd);
} else if (c->protocol == binary_prot) {
fprintf(stderr, "<%d new binary client connection.\n", sfd);
} else {
fprintf(stderr, "<%d new unknown (%d) client connection\n",
sfd, c->protocol);
assert(false);
}
}
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
c->ritem = 0;
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
c->ileft = 0;
c->suffixleft = 0;
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;
c->authenticated = false;
c->write_and_go = init_state;
c->write_and_free = 0;
c->item = 0;
c->noreply = false;
//設定事件,處理方式是event_handler
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
//註冊事件
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
STATS_LOCK();
stats.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd);
return c;
}
3.3 工作執行緒原始碼分析
1 主要函式中初始化工作執行緒。memcached.c中main()函式中呼叫thread_init初始化工作執行緒。thread_init函式主要在thread.c函式中。
/*
* Initializes the thread subsystem, creating various worker threads.
* 初始化執行緒系統
*
* nthreads Number of worker event handler threads to spawn 工作執行緒的數量
* main_base Event base for main thread 主執行緒的基礎時間
*/
void thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;
pthread_mutex_init(&cache_lock, NULL); //初始化cache鎖
pthread_mutex_init(&stats_lock, NULL); //初始化全域性統鎖
pthread_mutex_init(&init_lock, NULL); //初始化init鎖
pthread_cond_init(&init_cond, NULL); //該函式按引數attr指定的屬性建立一個條件變數。呼叫成功返回,
pthread_mutex_init(&cqi_freelist_lock, NULL); //初始化cqi_freelist鎖
cqi_freelist = NULL;
/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (