1. 程式人生 > 其它 >Redis非阻塞IO複用模型

Redis非阻塞IO複用模型

Redis執行緒模型(非阻塞IO複用模型)

Redis優點

  • 基於記憶體,C語言編寫-速度快
  • 非阻塞的IO複用模型機制
  • 單執行緒- 避免多執行緒的頻繁上下文切換問題
  • 豐富的資料結構 - 字串、連結串列、雜湊、集合、有序集合

檔案事件處理器(file event handler)

Redis伺服器是一個事件驅動程式

Redis基於Reactor模型開發了自己的網路事件處理器 - 檔案事件處理器

  • 檔案事件處理器使用I/O多路複用(multiplexing)同時監聽多個套接字,並根據套接字目前執行的任務為套接字關聯不同的事件處理器
  • 當監聽套接字準備好執行應答(accept)、讀取(read)、寫入(write)、關閉(close)等操作時,與操作所對應的檔案事件就會產生,檔案事件處理器就會呼叫套接字之前關聯好的事件處理器來處理這些事件

檔案事件處理器是單執行緒的,但是通過/O多路複用(multiplexing)同時監聽多個套接字檔案時期處理器既實現了高效能網路通訊模型,有保證了內部單執行緒設計的簡單性

ae.h

/*
 * 檔案事件狀態
 */
// 未設定
#define AE_NONE 0
// 可讀
#define AE_READABLE 1
// 可寫
#define AE_WRITABLE 2

Redis與客戶端通訊流程

  • 1.客戶端 Socket01 向 Redis 的 Server Socket 請求建立連線,此時 Server Socket 會產生一個 AE_READABLE 事件,IO 多路複用程式監聽到 server socket 產生的事件後,將該事件壓入佇列中。檔案事件分派器從佇列中獲取該事件,交給連線應答處理器。連線應答處理器會建立一個能與客戶端通訊的 Socket01,並將該 Socket01 的 AE_READABLE 事件與命令請求處理器關聯
  • 2.此時客戶端傳送了一個 set key value 請求,此時 Redis 中的 Socket01 會產生 AE_READABLE 事件,IO 多路複用程式將事件壓入佇列,此時事件分派器從佇列中獲取到該事件,由於前面 Socket01 的 AE_READABLE 事件已經與命令請求處理器關聯,因此事件分派器將事件交給命令請求處理器來處理。命令請求處理器讀取 Socket01 的 set key value 並在自己記憶體中完成 set key value 的設定。操作完成後,它會將 Socket01 的 AE_WRITABLE 事件與命令回覆處理器關聯
  • 3.如果此時客戶端準備好接收返回結果了,那麼 Redis 中的 Socket01 會產生一個 AE_WRITABLE 事件,同樣壓入佇列中,事件分派器找到相關聯的命令回覆處理器,由命令回覆處理器對 Socket01 輸入本次操作的一個結果,比如 ok ,之後解除 Socket01 的 AE_WRITABLE 事件與命令回覆處理器的關聯

I/O多路複用

ae.c

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 * 處理所有已到達的時間事件,以及所有已就緒的檔案事件。
 *
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * 如果不傳入特殊 flags 的話,那麼函式睡眠直到檔案事件就緒,
 * 或者下個時間事件到達(如果有的話)。
 *
 * If flags is 0, the function does nothing and returns.
 * 如果 flags 為 0 ,那麼函式不作動作,直接返回。
 *
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * 如果 flags 包含 AE_ALL_EVENTS ,所有型別的事件都會被處理。
 *
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * 如果 flags 包含 AE_FILE_EVENTS ,那麼處理檔案事件。
 *
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * 如果 flags 包含 AE_TIME_EVENTS ,那麼處理時間事件。
 *
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 * 如果 flags 包含 AE_DONT_WAIT ,
 * 那麼函式在處理完所有不許阻塞的事件之後,即刻返回。
 *
 * The function returns the number of events processed. 
 * 函式的返回值為已處理事件的數量
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 獲取最近的時間事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果時間事件存在的話
            // 那麼根據最近可執行時間事件和現在時間的時間差來決定檔案事件的阻塞時間
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 計算距今最近的時間事件還要多久才能達到
            // 並將該時間距儲存在 tv 結構中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 時間差小於 0 ,說明事件已經可以執行了,將秒和毫秒設為 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            // 執行到這一步,說明沒有時間事件
            // 那麼根據 AE_DONT_WAIT 是否設定來決定是否阻塞,以及阻塞的時間長度

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 設定檔案事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 檔案事件可以阻塞直到有事件到達為止
                tvp = NULL; /* wait forever */
            }
        }

        // 處理檔案事件,阻塞時間由 tvp 決定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 從已就緒陣列中獲取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 讀事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 確保讀/寫事件只能執行其中一個
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 寫事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    /* Check time events */
    // 執行時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

/* Wait for milliseconds until the given file descriptor becomes
 * writable/readable/exception 
 *
 * 在給定毫秒內等待,直到 fd 變成可寫、可讀或異常
 */
int aeWait(int fd, int mask, long long milliseconds) {
    struct pollfd pfd;
    int retmask = 0, retval;

    memset(&pfd, 0, sizeof(pfd));
    pfd.fd = fd;
    if (mask & AE_READABLE) pfd.events |= POLLIN;
    if (mask & AE_WRITABLE) pfd.events |= POLLOUT;

    if ((retval = poll(&pfd, 1, milliseconds))== 1) {
        if (pfd.revents & POLLIN) retmask |= AE_READABLE;
        if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
	if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
        if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
        return retmask;
    } else {
        return retval;
    }
}
/*
 * 事件處理器的主迴圈
 */
void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件處理前執行的函式,那麼執行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 開始處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

aeApiPoll()函式

編譯時會自動選擇系統中效能最高的I/O多路複用函式庫作為Redis的I/O多路複用程式

檔案事件的處理器

  • 連線應答處理器

networking.c/acceptTcpHandler

  • 命令請求處理器

networking.c/readQueryFrom

  • 命令回覆處理器

networking.c/sendReplyToClient