自頂向下redis4.0(2)檔案事件與客戶端
redis4.0的檔案事件與客戶端
本文會隨著學習redis4.0的推進而更新。
因為檔案事件和客戶端緊密關聯,所以筆者將兩者放在同一小節中。
簡介
檔案事件的流程大概如下:
- 在伺服器初始化時生成
aeEventLoop
並賦值給server
,接著建立監聽TCP連線事件。 - 處理TCP連線時會建立
client
型別的物件,將其繫結在accept
函式返回的檔案描述符fd
上,並對fd
註冊一個可讀事件,當客戶端資料來臨時,readQueryFromClient
會對資料進行處理。 redis
處理完資料後,會呼叫write函式將資料返回給客戶端(注意:但不是在一個迴圈裡)。如果函式返回的值小於寫入的值,說明系統快取區空間不夠,或者檔案描述符在中途被佔用,那麼redis
sendReplyToClient
函式會寫入剩餘的資料。- 當客戶端斷開連線,伺服器會釋放
client
相關的資源,隨之刪除對應的檔案事件。
正文
準備階段
在初始化伺服器時,server
函式建立clients
,clients_pending_write
等欄位,並通過aeCreateEventLoop
建立一個aeEventLoop
物件。
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; eventLoop = zmalloc(sizeof(*eventLoop)); eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); eventLoop->setsize = setsize; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set.*/ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; }
此處傳入引數setsize的值為maxClients
+128,maxClients
預設值為10,000。events
用於存放註冊的檔案事件,而fired
則在事件觸發時,存放被觸發的事件。兩者的長度都為setsize
大小。
緊接著便會註冊第一個檔案事件。
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
這裡我們更進一步看下aeCreateFileEvent
的程式碼, fd
檔案描述符 被用於偏移來獲取對應的檔案事件結構,因此fd
的值必須小於之前註冊的事件大小的值。第一個被用於註冊檔案事件的fd
eventLoop->events
的空間並沒有並完全利用。此處還通過mask
來註冊對應的事件觸發後的處理函式。如果是監聽可讀事件,那麼rfileProc
處理函式會被賦值。可寫事件同理。此時並沒有傳入clientData
,我們會在下文再回到這個函式。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
接受客戶端連線
當接受來自客戶端連線時,便會呼叫acceptTcpHandler
函式,該函式會接受所有客戶端的請求,但一次最多接受MAX_ACCEPTS_PER_CALL
1000個客戶端,並且如果在輪詢中發現沒有客戶端請求,就會立刻返回。接受了一個客戶端連線請求後,便會進入處理函式acceptCommonHandler
,它會建立一個client的物件,如果連線的數量大於設定的值,則會斷開連線。如果redis
跑在保護模式,則可能返回錯誤資訊。
If no pending connections are present on the queue, and the socket is not marked as nonblocking, accept() blocks the caller until a connection is present. If the socket is marked nonblocking and no pending connections are present on the queue, accept() fails with the error EAGAIN or EWOULDBLOCK.
最主要的程式碼位於createClient
,它會註冊客戶端可讀事件,關聯readQueryFromClient
函式,並且初始化client
的一些屬性。
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
selectDb(c,0);
uint64_t client_id;
client_id = server.next_client_id;
server.next_client_id += 1;
c->id = client_id;
c->fd = fd;
c->name = NULL;
c->bufpos = 0; //下一個返回資料存入位置
// c->buf 陣列儲存返回給客戶端的資料
c->querybuf = sdsempty(); //查詢快取
c->reqtype = 0; //查詢型別 一般為multi
c->argc = 0; //引數個數 由querybuf解析而得
c->argv = NULL;//引數值 由querybuf解析而得
c->cmd = c->lastcmd = NULL;
c->multibulklen = 0; //查詢資料的行數
c->bulklen = -1;//一行查詢資料的長度
c->sentlen = 0;//已經發送的資料長度
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
c->reply = listCreate(); //如果buf 陣列溢位,則使用reply連結串列
c->reply_bytes = 0; //reply連結串列中物件總共的位元組數
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
if (fd != -1) linkClient(c);
return c;
}
處理資料
redis
通過aeProcessEvents
函式處理各種事件,首先它會呼叫aeApiPoll
函式通過多路複用函式來檢查已經觸發的事件,並將已經觸發事件的檔案描述符,事件型別賦值給eventLoop->fired
。然後根據事件觸發的型別,呼叫之前註冊的函式。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
int j;
struct timeval tv, *tvp;
tvp = NULL; /* wait forever */
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
}
processed++;
}
return processed; /* return the number of processed file/time events */
}
如果此時有來自客戶端的資料,那麼將會觸發AE_READABLE
事件,呼叫readQueryFromClient
函式。預設情況下一次只能讀取16KB,除非上次已經讀取過資料,並且資料量較大,一行長度超過32KB。如果超過32KB,那麼一次讀取剩餘該行長度的值。這是因為TCP接受的資料不一定是完整的資料,如果是PROTO_REQ_MULTIBULK
多行請求,並且資料量過大,在redis
開始處理請求前需要接收全部的資料,等待的時間過長,並且解析完畢之後,執行命令的時間和下發資料的長度也會影響效能。建議一次請求不超過16KB,但這16KB中還包含著*/r/n等格式符號,因此請求的資料量還要再小一些,才能保證服務端儘可能在一次接收資料的過程中完成命令的解析。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
readlen = PROTO_IOBUF_LEN;//1024*16 bytes
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
sdsIncrLen(c->querybuf,nread);
processInputBuffer(c);
}
接著就會進入processInputBuffer
函式,此時資料可能全部抵達,也可能部分抵達。processInputBuffer
函式的主要功能是將,client->querybuff
裡面的資料解析,並轉化為client->argc
和client->argv
的資料。如果資料全部抵達,那麼接著會進入到processComand
函式,查詢命令表,執行命令並返回資料給客戶的。如果資料部分抵達,但是一行的資料內容抵達,那麼該行資料會被解析到client->argc
和client->argv
中去。
返回資料結果
在這裡我們假設客戶端輸入的字串是quit
,processComand
函式會呼叫addReply
函式將當前的client
加入到clients_pending_write
連結串列中。
再將儲存OK
字串的物件新增到緩衝區,服務端返回給客戶端的編碼型別只可能是字元型或者是INT型。首先redis
會嘗試將結果新增到緩衝區,緩衝區的大小預設16KB,並且不能通過配置更改。如果緩衝區會溢位,那麼redis
會將資料新增到client->reply
連結串列中。
void addReply(client *client, robj *obj) {
if (prepareClientToWrite(client) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(client,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(client,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
...
} else {
// serverPanic("Wrong obj->encoding in addReply()");
serverLog(LL_WARNING, "Wron obj->encoding in addReply()");
}
}
此時資料還沒有返回給客戶端,在redis
進入下一次迴圈的時候,會呼叫beforeSleep
函式將資料返回給客戶端。
為什麼redis
不直接將資料返回給客戶端呢?
原始碼的註釋給出了答案:為了實現fsync=always
的效果,將返回資料放在beforeSleep
中,可以通過AOF
持久後,再返回給客戶端結果。
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
beforeLoop
會接著呼叫handleClientsWithPendingWrites
函式來處理有快取資料的client
,writeToClient
函式會將buf
中和reply
連結串列中的資料全部發送給客戶端,如果實際傳送的資料小於應當傳送的資料,則表示系統快取區空間不夠,或者檔案描述符在中途被佔用,那麼redis
會建立一個事件,當監聽到檔案描述符可讀時,再將剩餘資料寫入。
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
在寫完資料後,發現客戶端有被標記CLIENT_CLOSE_AFTER_REPLY
,那麼將會釋放客戶端的資源。
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}