1. 程式人生 > >memcached 原始碼分析——半同步、半非同步模式

memcached 原始碼分析——半同步、半非同步模式

memcached 是目前應用非常廣泛的快取伺服器,採用的是半同步、半非同步模式。

半同步、半非同步

半同步/半非同步模型的基礎設施:主執行緒建立多個子執行緒(這些子執行緒也稱為worker執行緒),每一個執行緒都維持自己的事件迴圈,即每個執行緒都有自己的epoll,並且都會呼叫epoll_wait函式進入事件監聽狀態。每一個worker執行緒(子執行緒)和主執行緒之間都用一條管道相互通訊。每一個子執行緒都監聽自己對應那條管道的讀端。當主執行緒想和某一個worker執行緒進行通訊,直接往對應的那條管道寫入資料即可。
半同步/半非同步模型的工作流程:主執行緒負責監聽程序對外的TCP監聽埠。當客戶端申請連線connect到程序的時候,主執行緒負責接收accept客戶端的連線請求。然後主執行緒選擇其中一個worker執行緒,把客戶端fd通過對應的管道傳給worker執行緒。worker執行緒得到客戶端的fd後負責和這個客戶端進行一切的通訊。
半同步/半非同步的模式設計,需要三個模組:生產者,消費者,通知佇列。這裡的生產者為主執行緒,消費者為worker執行緒,通知佇列為CQ。當主執行緒收到客戶端的一個請求,喚醒epoll_wait,觸發回撥函式event_handler。
在event_handler中,主要呼叫drive_machine函式,如果是TCP連線,accept到該連線請求,獲取sockfd,然後通過dispatch_conn_new往訊息佇列新增一個元素。worker執行緒的epoll_wait感知到有事件可讀,便觸發回撥thread_libevent_process 。並且conn_new建立一個conn物件,建立一個event,對映sockfd後設置回撥函式。新增到epoll中。這樣,客戶端傳送的請求將直接由worker執行緒處理。

static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int nreqs = settings.reqs_per_event;
    int res;
    const char *str;
#ifdef HAVE_ACCEPT4
    static int  use_accept4 = 1;
#else
    static int  use_accept4 = 0;
#endif
assert(c != NULL); while (!stop) { switch(c->state) { case conn_listening: addrlen = sizeof(addr); #ifdef HAVE_ACCEPT4 if (use_accept4) { sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); } else
{ sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); } #else sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); #endif if (sfd == -1){ ... ... } else { dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport); } stop = true; break; case conn_waiting: ... ... } } void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } } static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; if (read(fd, buf, 1) != 1) if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); switch (buf[0]) { case 'c': item = cq_pop(me->new_conn_queue); if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } cqi_free(item); } break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; } }

drive_machine函式根據當前連線的狀態,對客戶端發起的請求做出不同的響應。
以上為收到TCP連線請求的響應步驟。如果客戶端發起UDP請求,則直接進入dispatch_conn_new函式,並且狀態為conn_read。讀取資料。

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    ... ...
if (IS_UDP(transport)) {
            int c;

            for (c = 0; c < settings.num_threads_per_udp; c++) {
                int per_thread_fd = c ? dup(sfd) : sfd;
                dispatch_conn_new(per_thread_fd, conn_read,
                                  EV_READ | EV_PERSIST,
                                  UDP_READ_BUFFER_SIZE, transport);
            }
        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
}