memcached 原始碼分析——半同步、半非同步模式
memcached 是目前應用非常廣泛的快取伺服器,採用的是半同步、半非同步模式。
半同步、半非同步
半同步/半非同步模型的基礎設施:主執行緒建立多個子執行緒(這些子執行緒也稱為worker執行緒),每一個執行緒都維持自己的事件迴圈,即每個執行緒都有自己的epoll,並且都會呼叫epoll_wait函式進入事件監聽狀態。每一個worker執行緒(子執行緒)和主執行緒之間都用一條管道相互通訊。每一個子執行緒都監聽自己對應那條管道的讀端。當主執行緒想和某一個worker執行緒進行通訊,直接往對應的那條管道寫入資料即可。
半同步/半非同步模型的工作流程:主執行緒負責監聽程序對外的TCP監聽埠。當客戶端申請連線connect到程序的時候,主執行緒負責接收accept客戶端的連線請求。然後主執行緒選擇其中一個worker執行緒,把客戶端fd通過對應的管道傳給worker執行緒。worker執行緒得到客戶端的fd後負責和這個客戶端進行一切的通訊。
半同步/半非同步的模式設計,需要三個模組:生產者,消費者,通知佇列。這裡的生產者為主執行緒,消費者為worker執行緒,通知佇列為CQ。當主執行緒收到客戶端的一個請求,喚醒epoll_wait,觸發回撥函式event_handler。
在event_handler中,主要呼叫drive_machine函式,如果是TCP連線,accept到該連線請求,獲取sockfd,然後通過dispatch_conn_new往訊息佇列新增一個元素。worker執行緒的epoll_wait感知到有事件可讀,便觸發回撥thread_libevent_process 。並且conn_new建立一個conn物件,建立一個event,對映sockfd後設置回撥函式。新增到epoll中。這樣,客戶端傳送的請求將直接由worker執行緒處理。
static void drive_machine(conn *c) {
bool stop = false;
int sfd;
socklen_t addrlen;
struct sockaddr_storage addr;
int nreqs = settings.reqs_per_event;
int res;
const char *str;
#ifdef HAVE_ACCEPT4
static int use_accept4 = 1;
#else
static int use_accept4 = 0;
#endif
assert(c != NULL);
while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
if (sfd == -1){
... ...
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
case conn_waiting:
... ...
}
}
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;
}
cqi_free(item);
}
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
}
}
drive_machine函式根據當前連線的狀態,對客戶端發起的請求做出不同的響應。
以上為收到TCP連線請求的響應步驟。如果客戶端發起UDP請求,則直接進入dispatch_conn_new函式,並且狀態為conn_read。讀取資料。
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
... ...
if (IS_UDP(transport)) {
int c;
for (c = 0; c < settings.num_threads_per_udp; c++) {
int per_thread_fd = c ? dup(sfd) : sfd;
dispatch_conn_new(per_thread_fd, conn_read,
EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}