1. 程式人生 > 實用技巧 >自頂向下redis4.0(2)檔案事件與客戶端

自頂向下redis4.0(2)檔案事件與客戶端

redis4.0的檔案事件與客戶端

本文會隨著學習redis4.0的推進而更新。

因為檔案事件和客戶端緊密關聯,所以筆者將兩者放在同一小節中。

簡介

檔案事件的流程大概如下:

  1. 在伺服器初始化時生成aeEventLoop並賦值給server,接著建立監聽TCP連線事件。
  2. 處理TCP連線時會建立client型別的物件,將其繫結在accept函式返回的檔案描述符fd上,並對fd註冊一個可讀事件,當客戶端資料來臨時,readQueryFromClient會對資料進行處理。
  3. redis處理完資料後,會呼叫write函式將資料返回給客戶端(注意:但不是在一個迴圈裡)。如果函式返回的值小於寫入的值,說明系統快取區空間不夠,或者檔案描述符在中途被佔用,那麼redis
    會註冊一個可寫事件,當可寫事件觸發時,sendReplyToClient函式會寫入剩餘的資料。
  4. 當客戶端斷開連線,伺服器會釋放client相關的資源,隨之刪除對應的檔案事件。

正文

準備階段

在初始化伺服器時,server函式建立clientsclients_pending_write等欄位,並通過aeCreateEventLoop建立一個aeEventLoop物件。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    eventLoop = zmalloc(sizeof(*eventLoop));
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    eventLoop->setsize = setsize;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set.*/
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
}

此處傳入引數setsize的值為maxClients+128,maxClients預設值為10,000。events用於存放註冊的檔案事件,而fired則在事件觸發時,存放被觸發的事件。兩者的長度都為setsize大小。

緊接著便會註冊第一個檔案事件。

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) 

這裡我們更進一步看下aeCreateFileEvent的程式碼, fd檔案描述符 被用於偏移來獲取對應的檔案事件結構,因此fd的值必須小於之前註冊的事件大小的值。第一個被用於註冊檔案事件的fd

用於監聽TCP連線,由於程序啟動時會開啟一些其他的檔案,因此eventLoop->events的空間並沒有並完全利用。此處還通過mask來註冊對應的事件觸發後的處理函式。如果是監聽可讀事件,那麼rfileProc處理函式會被賦值。可寫事件同理。此時並沒有傳入clientData,我們會在下文再回到這個函式。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd]; 

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

接受客戶端連線

當接受來自客戶端連線時,便會呼叫acceptTcpHandler函式,該函式會接受所有客戶端的請求,但一次最多接受MAX_ACCEPTS_PER_CALL1000個客戶端,並且如果在輪詢中發現沒有客戶端請求,就會立刻返回。接受了一個客戶端連線請求後,便會進入處理函式acceptCommonHandler,它會建立一個client的物件,如果連線的數量大於設定的值,則會斷開連線。如果redis跑在保護模式,則可能返回錯誤資訊。

If no pending connections are present on the queue, and the socket is
not marked as nonblocking, accept() blocks the caller until a
connection is present.  If the socket is marked nonblocking and no
pending connections are present on the queue, accept() fails with the
error EAGAIN or EWOULDBLOCK.

最主要的程式碼位於createClient,它會註冊客戶端可讀事件,關聯readQueryFromClient函式,並且初始化client的一些屬性。

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
             anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;

    client_id = server.next_client_id;
    server.next_client_id += 1;

    c->id = client_id;
    c->fd = fd;
    c->name = NULL;
    c->bufpos = 0; //下一個返回資料存入位置
    				// c->buf 陣列儲存返回給客戶端的資料
    c->querybuf = sdsempty(); //查詢快取
    c->reqtype = 0; //查詢型別 一般為multi
    c->argc = 0; //引數個數 由querybuf解析而得
    c->argv = NULL;//引數值 由querybuf解析而得
    c->cmd = c->lastcmd = NULL;
    c->multibulklen = 0; //查詢資料的行數
    c->bulklen = -1;//一行查詢資料的長度
    c->sentlen = 0;//已經發送的資料長度
    c->flags = 0; 
    c->ctime = c->lastinteraction = server.unixtime;
    c->reply = listCreate(); //如果buf 陣列溢位,則使用reply連結串列
    c->reply_bytes = 0;		//reply連結串列中物件總共的位元組數
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,freeClientReplyValue);
    listSetDupMethod(c->reply,dupClientReplyValue);
    if (fd != -1) linkClient(c);
    return c;
}

處理資料

redis通過aeProcessEvents函式處理各種事件,首先它會呼叫aeApiPoll函式通過多路複用函式來檢查已經觸發的事件,並將已經觸發事件的檔案描述符,事件型別賦值給eventLoop->fired。然後根據事件觸發的型別,呼叫之前註冊的函式。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    int j;
    struct timeval tv, *tvp;

    tvp = NULL; /* wait forever */

    numevents = aeApiPoll(eventLoop, tvp);

    for (j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop->fired[j].mask;
        int fd = eventLoop->fired[j].fd;


        if (fe->mask & mask & AE_READABLE) {
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        }

        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
        }

        processed++;
    }

    return processed; /* return the number of processed file/time events */
}

如果此時有來自客戶端的資料,那麼將會觸發AE_READABLE事件,呼叫readQueryFromClient函式。預設情況下一次只能讀取16KB,除非上次已經讀取過資料,並且資料量較大,一行長度超過32KB。如果超過32KB,那麼一次讀取剩餘該行長度的值。這是因為TCP接受的資料不一定是完整的資料,如果是PROTO_REQ_MULTIBULK多行請求,並且資料量過大,在redis開始處理請求前需要接收全部的資料,等待的時間過長,並且解析完畢之後,執行命令的時間和下發資料的長度也會影響效能。建議一次請求不超過16KB,但這16KB中還包含著*/r/n等格式符號,因此請求的資料量還要再小一些,才能保證服務端儘可能在一次接收資料的過程中完成命令的解析。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;

    readlen = PROTO_IOBUF_LEN;//1024*16 bytes

    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);

    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c); 
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c); 
        return;
    } 

    sdsIncrLen(c->querybuf,nread);

    processInputBuffer(c);

}

接著就會進入processInputBuffer函式,此時資料可能全部抵達,也可能部分抵達。processInputBuffer函式的主要功能是將,client->querybuff裡面的資料解析,並轉化為client->argcclient->argv的資料。如果資料全部抵達,那麼接著會進入到processComand函式,查詢命令表,執行命令並返回資料給客戶的。如果資料部分抵達,但是一行的資料內容抵達,那麼該行資料會被解析到client->argcclient->argv中去。

返回資料結果

在這裡我們假設客戶端輸入的字串是quitprocessComand函式會呼叫addReply函式將當前的client加入到clients_pending_write連結串列中。

再將儲存OK字串的物件新增到緩衝區,服務端返回給客戶端的編碼型別只可能是字元型或者是INT型。首先redis會嘗試將結果新增到緩衝區,緩衝區的大小預設16KB,並且不能通過配置更改。如果緩衝區會溢位,那麼redis會將資料新增到client->reply連結串列中。

void addReply(client *client, robj *obj) {
    if (prepareClientToWrite(client) != C_OK) return; 

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(client,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(client,obj);
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        ...
    } else {
        // serverPanic("Wrong obj->encoding in addReply()");
        serverLog(LL_WARNING, "Wron obj->encoding in addReply()");
    }
}

此時資料還沒有返回給客戶端,在redis進入下一次迴圈的時候,會呼叫beforeSleep函式將資料返回給客戶端。

為什麼redis不直接將資料返回給客戶端呢?

原始碼的註釋給出了答案:為了實現fsync=always的效果,將返回資料放在beforeSleep中,可以通過AOF持久後,再返回給客戶端結果。

 /* For the fsync=always policy, we want that a given FD is never
  * served for reading and writing in the same event loop iteration,
  * so that in the middle of receiving the query, and serving it
  * to the client, we'll call beforeSleep() that will do the
  * actual fsync of AOF to disk. AE_BARRIER ensures that. */

beforeLoop會接著呼叫handleClientsWithPendingWrites函式來處理有快取資料的clientwriteToClient函式會將buf中和reply連結串列中的資料全部發送給客戶端,如果實際傳送的資料小於應當傳送的資料,則表示系統快取區空間不夠,或者檔案描述符在中途被佔用,那麼redis會建立一個事件,當監聽到檔案描述符可讀時,再將剩餘資料寫入。

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
             int ae_flags = AE_WRITABLE; 
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

在寫完資料後,發現客戶端有被標記CLIENT_CLOSE_AFTER_REPLY,那麼將會釋放客戶端的資源。

        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            return C_ERR;
        }

參考文獻

accept函式

《Redis設計與實現》