1. 程式人生 > 其它 >Redis命令執行過程

Redis命令執行過程

  今天我們來了解一下 Redis 命令執行的過程。在之前的文章中《當 Redis 發生高延遲時,到底發生了什麼》我們曾簡單的描述了一條命令的執行過程,本篇文章展示深入說明一下,加深讀者對 Redis 的瞭解。

上篇

  如下圖所示,一條命令執行完成並且返回資料一共涉及三部分,第一步是建立連線階段,響應了socket的建立,並且建立了client物件;第二步是處理階段,從socket讀取資料到輸入緩衝區,然後解析並獲得命令,執行命令並將返回值儲存到輸出緩衝區中;第三步是資料返回階段,將返回值從輸出緩衝區寫到socket中,返回給客戶端,最後關閉client。

這三個階段之間是通過事件機制串聯了,在 Redis 啟動階段首先要註冊socket連線建立事件處理器:

  • 當客戶端發來建立socket的連線的請求時,對應的處理器方法會被執行,建立連線階段的相關處理就會進行,然後註冊socket讀取事件處理器
  • 當客戶端發來命令時,讀取事件處理器方法會被執行,對應處理階段的相關邏輯都會被執行,然後註冊socket寫事件處理器
  • 當寫事件處理器被執行時,就是將返回值寫回到socket中。

接下來,我們分別來看一下各個步驟的具體原理和程式碼實現。

啟動時監聽socket

  Redis 伺服器啟動時,會呼叫 initServer 方法,首先會建立 Redis 自己的事件機制 eventLoop,然後在其上註冊週期時間事件處理器,最後在所監聽的 socket 上建立檔案事件處理器,監聽 socket 建立連線的事件,其處理函式為 acceptTcpHandler。

 1 void initServer(void) { // server.c
 2     ....
 3     //建立aeEventLoop
 4     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
 5     if (server.el == NULL) {
 6         serverLog(LL_WARNING,
 7             "Failed creating the event loop. Error message: '%s'",
 8             strerror(errno));
9 exit(1); 10 } 11 server.db = zmalloc(sizeof(redisDb)*server.dbnum); 12 /* Open the TCP listening socket for the user commands. */ 13 14 if (server.port != 0 && 15 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) 16 exit(1); 17 18 ··· 19 20 /** 21 * 註冊週期時間事件,處理後臺操作,比如說客戶端操作、過期鍵等 22 */ 23 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { 24 serverPanic("Can't create event loop timers."); 25 exit(1); 26 } 27 /** 28 * 為所有監聽的socket建立檔案事件,監聽可讀事件;事件處理函式為acceptTcpHandler 29 * 30 */ 31 for (j = 0; j < server.ipfd_count; j++) { 32 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 33 acceptTcpHandler,NULL) == AE_ERR) 34 { 35 serverPanic( 36 "Unrecoverable error creating server.ipfd file event."); 37 } 38 } 39 .... 40 }

  我們曾詳細介紹過 Redis 的事件機制,可以說,Redis 命令執行過程中都是由事件機制協調管理的,也就是 initServer 方法中生成的 aeEventLoop。當socket發生對應的事件時,aeEventLoop 對呼叫已經註冊的對應的事件處理器。

建立連線和Client

  當客戶端向 Redis 建立 socket時,aeEventLoop 會呼叫 acceptTcpHandler 處理函式,伺服器會為每個連結建立一個 Client 物件,並建立相應檔案事件來監聽socket的可讀事件,並指定事件處理函式。acceptTcpHandler 函式會首先呼叫anetTcpAccept方法,它底層會呼叫 socket 的 accept 方法,也就是接受客戶端來的建立連線請求,然後呼叫acceptCommonHandler方法,繼續後續的邏輯處理。

 1 /**
 2  * 建立一個TCP的連線處理程式
 3  *
 4  * 為了對連線伺服器的各個客戶端進行應答, 伺服器要為監聽套接字關聯連線應答處理器。
 5  * 這個處理器用於對連線伺服器監聽套接字的客戶端進行應答,具體實現為sys/socket.h/accept函式的包裝。
 6  * 當Redis伺服器進行初始化的時候,程式會將這個連線應答處理器和伺服器監聽套接字的AE_READABLE事件關聯起來,
 7  * 當有客戶端用sys/socket.h/connect函式連線伺服器監聽套接字的時候, 套接字就會產生AE_READABLE 事件,
 8  * 引發連線應答處理器執行, 並執行相應的套接字應答操作,
 9  */
10 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
11     //#define MAX_ACCEPTS_PER_CALL 1000
12     int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
13     char cip[NET_IP_STR_LEN];
14     UNUSED(el);
15     UNUSED(mask);
16     UNUSED(privdata);
17 
18     while(max--) {
19         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
20         if (cfd == ANET_ERR) {
21             if (errno != EWOULDBLOCK)
22             //連線失敗,日誌記錄
23                 serverLog(LL_WARNING,
24                     "Accepting client connection: %s", server.neterr);
25             return;
26         }
27         //連線成功,日誌記錄
28         serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
29         //為通訊檔案描述符建立對應的客戶端結構體
30         acceptCommonHandler(cfd,0,cip);
31     }
32 }

  acceptCommonHandler 則首先呼叫 createClient 建立 client,接著判斷當前 client 的數量是否超出了配置的 maxclients,如果超過,則給客戶端傳送錯誤資訊,並且釋放 client。

 1 #define MAX_ACCEPTS_PER_CALL 1000
 2 // TCP連線處理程式,建立一個client的連線狀態
 3 static void acceptCommonHandler(int fd, int flags, char *ip) {
 4     client *c;
 5     // 建立一個新的client
 6     if ((c = createClient(fd)) == NULL) {
 7         serverLog(LL_WARNING,
 8             "Error registering fd event for the new client: %s (fd=%d)",
 9             strerror(errno),fd);
10         close(fd); /* May be already closed, just ignore errors */
11         return;
12     }
13     /**
14      * If maxclient directive is set and this is one client more... close the
15      * connection. Note that we create the client instead to check before
16      * for this condition, since now the socket is already set in non-blocking
17      * mode and we can send an error for free using the Kernel I/O
18      *
19      * 如果新的client超過server規定的maxclients的限制,那麼想新client的fd寫入錯誤資訊,關閉該client
20      * 先建立client,在進行數量檢查,是因為更好的寫入錯誤資訊
21      */
22     if (listLength(server.clients) > server.maxclients) {
23         char *err = "-ERR max number of clients reached\r\n";
24 
25         /* That's a best effort error message, don't check write errors */
26         if (write(c->fd,err,strlen(err)) == -1) {
27             /* Nothing to do, Just to avoid the warning... */
28         }
29         // 更新拒接連線的個數
30         server.stat_rejected_conn++;
31         freeClient(c);
32         return;
33     }
34 
35     /**
36      * If the server is running in protected mode (the default) and there
37      * is no password set, nor a specific interface is bound, we don't accept
38      * requests from non loopback interfaces. Instead we try to explain the
39      * user what to do to fix it if needed.
40      *
41      * 如果伺服器正在以保護模式執行(預設),且沒有設定密碼,也沒有繫結指定的介面,
42      * 我們就不接受非迴環介面的請求。相反,如果需要,我們會嘗試解釋使用者如何解決問題
43      */
44     if (server.protected_mode &&
45         server.bindaddr_count == 0 &&
46         server.requirepass == NULL &&
47         !(flags & CLIENT_UNIX_SOCKET) &&
48         ip != NULL)
49     {
50         if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
51             char *err =
52                 "-DENIED Redis is running in protected mode because protected "
53                 "mode is enabled, no bind address was specified, no "
54                 "authentication password is requested to clients. In this mode "
55                 "connections are only accepted from the loopback interface. "
56                 "If you want to connect from external computers to Redis you "
57                 "may adopt one of the following solutions: "
58                 "1) Just disable protected mode sending the command "
59                 "'CONFIG SET protected-mode no' from the loopback interface "
60                 "by connecting to Redis from the same host the server is "
61                 "running, however MAKE SURE Redis is not publicly accessible "
62                 "from internet if you do so. Use CONFIG REWRITE to make this "
63                 "change permanent. "
64                 "2) Alternatively you can just disable the protected mode by "
65                 "editing the Redis configuration file, and setting the protected "
66                 "mode option to 'no', and then restarting the server. "
67                 "3) If you started the server manually just for testing, restart "
68                 "it with the '--protected-mode no' option. "
69                 "4) Setup a bind address or an authentication password. "
70                 "NOTE: You only need to do one of the above things in order for "
71                 "the server to start accepting connections from the outside.\r\n";
72             if (write(c->fd,err,strlen(err)) == -1) {
73                 /* Nothing to do, Just to avoid the warning... */
74             }
75             // 更新拒接連線的個數
76             server.stat_rejected_conn++;
77             freeClient(c);
78             return;
79         }
80     }
81 
82     // 更新連線的數量
83     server.stat_numconnections++;
84     // 更新client狀態的標誌
85     c->flags |= flags;
86 }

  createClient 方法用於建立 client,它代表著連線到 Redis 客戶端,每個客戶端都有各自的輸入緩衝區和輸出緩衝區,輸入緩衝區儲存客戶端通過 socket 傳送過來的資料,輸出緩衝區則儲存著 Redis 對客戶端的響應資料。client一共有三種類型,不同型別的對應緩衝區的大小都不同。

  • 普通客戶端是除了複製和訂閱的客戶端之外的所有連線
  • 從客戶端用於主從複製,主節點會為每個從節點單獨建立一條連線用於命令複製
  • 訂閱客戶端用於釋出訂閱功能

  createClient 方法除了建立 client 結構體並設定其屬性值外,還會對 socket進行配置並註冊讀事件處理器,設定 socket 為 非阻塞 socket、設定 NO_DELAY 和 SO_KEEPALIVE標誌位來關閉 Nagle 演算法並且啟動 socket 存活檢查機制。設定讀事件處理器,當客戶端通過 socket 傳送來資料後,Redis 會呼叫 readQueryFromClient 方法。

  1 client *createClient(int fd) {
  2     //分配空間
  3     client *c = zmalloc(sizeof(client));
  4 
  5     /**
  6      * passing -1 as fd it is possible to create a non connected client.
  7      * This is useful since all the commands needs to be executed
  8      * in the context of a client. When commands are executed in other
  9      * contexts (for instance a Lua script) we need a non connected client.
 10      *
 11      * 如果fd為-1,表示建立的是一個無網路連線的偽客戶端,用於執行lua指令碼的時候。
 12      * 如果fd不等於-1,表示建立一個有網路連線的客戶端
 13      */
 14     if (fd != -1) {
 15         // 設定fd為非阻塞模式
 16         anetNonBlock(NULL,fd);
 17         // 禁止使用 Nagle 演算法,client向核心遞交的每個資料包都會立即傳送給server出去,TCP_NODELAY
 18         anetEnableTcpNoDelay(NULL,fd);
 19         // 如果開啟了tcpkeepalive,則設定 SO_KEEPALIVE
 20         if (server.tcpkeepalive)
 21             anetKeepAlive(NULL,fd,server.tcpkeepalive);// 設定tcp連線的keep alive選項
 22         /**
 23          * 使能AE_READABLE事件,readQueryFromClient是該事件的回撥函式
 24          *
 25          * 建立一個檔案事件狀態el,且監聽讀事件,開始接受命令的輸入
 26          */
 27         if (aeCreateFileEvent(server.el,fd,AE_READABLE,
 28             readQueryFromClient, c) == AE_ERR)
 29         {
 30             close(fd);
 31             zfree(c);
 32             return NULL;
 33         }
 34     }
 35 
 36     // 預設選0號資料庫
 37     selectDb(c,0);
 38     uint64_t client_id;
 39     // 設定client的ID
 40     atomicGetIncr(server.next_client_id,client_id,1);
 41     c->id = client_id;
 42     // client的套接字
 43     c->fd = fd;
 44     // client的名字
 45     c->name = NULL;
 46     // 回覆固定(靜態)緩衝區的偏移量
 47     c->bufpos = 0;
 48     c->qb_pos = 0;
 49     // 輸入快取區
 50     c->querybuf = sdsempty();
 51     c->pending_querybuf = sdsempty();
 52     // 輸入快取區的峰值
 53     c->querybuf_peak = 0;
 54     // 請求協議型別,內聯或者多條命令,初始化為0
 55     c->reqtype = 0;
 56     // 引數個數
 57     c->argc = 0;
 58     // 引數列表
 59     c->argv = NULL;
 60     // 當前執行的命令和最近一次執行的命令
 61     c->cmd = c->lastcmd = NULL;
 62     // 查詢緩衝區剩餘未讀取命令的數量
 63     c->multibulklen = 0;
 64     // 讀入引數的長度
 65     c->bulklen = -1;
 66     // 已發的位元組數
 67     c->sentlen = 0;
 68     // client的狀態
 69     c->flags = 0;
 70     // 設定建立client的時間和最後一次互動的時間
 71     c->ctime = c->lastinteraction = server.unixtime;
 72     // 認證狀態
 73     c->authenticated = 0;
 74     // replication複製的狀態,初始為無
 75     c->replstate = REPL_STATE_NONE;
 76     // 設定從節點的寫處理器為ack,是否在slave向master傳送ack
 77     c->repl_put_online_on_ack = 0;
 78     // replication複製的偏移量
 79     c->reploff = 0;
 80     c->read_reploff = 0;
 81     // 通過ack命令接收到的偏移量
 82     c->repl_ack_off = 0;
 83     // 通過ack命令接收到的偏移量所用的時間
 84     c->repl_ack_time = 0;
 85     // 從節點的埠號
 86     c->slave_listening_port = 0;
 87     // 從節點IP地址
 88     c->slave_ip[0] = '\0';
 89     // 從節點的功能
 90     c->slave_capa = SLAVE_CAPA_NONE;
 91     // 回覆連結串列
 92     c->reply = listCreate();
 93     // 回覆連結串列的位元組數
 94     c->reply_bytes = 0;
 95     // 回覆緩衝區的記憶體大小軟限制
 96     c->obuf_soft_limit_reached_time = 0;
 97     // 回覆連結串列的釋放和複製方法
 98     listSetFreeMethod(c->reply,freeClientReplyValue);
 99     listSetDupMethod(c->reply,dupClientReplyValue);
100     // 阻塞型別
101     c->btype = BLOCKED_NONE;
102     // 阻塞超過時間
103     c->bpop.timeout = 0;
104     // 造成阻塞的鍵字典
105     c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
106     // 儲存解除阻塞的鍵,用於儲存PUSH入元素的鍵,也就是dstkey
107     c->bpop.target = NULL;
108     c->bpop.xread_group = NULL;
109     c->bpop.xread_consumer = NULL;
110     c->bpop.xread_group_noack = 0;
111     // 阻塞狀態
112     c->bpop.numreplicas = 0;
113     // 要達到的複製偏移量
114     c->bpop.reploffset = 0;
115     // 全域性的複製偏移量
116     c->woff = 0;
117     // 監控的鍵
118     c->watched_keys = listCreate();
119     // 訂閱頻道
120     c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
121     // 訂閱模式
122     c->pubsub_patterns = listCreate();
123     // 被快取的peerid,peerid就是 ip:port
124     c->peerid = NULL;
125     c->client_list_node = NULL;
126     // 訂閱釋出模式的釋放和比較方法
127     listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
128     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
129     // 將真正的client放在伺服器的客戶端連結串列中
130     if (fd != -1)
131         linkClient(c);//將當前客戶端加入全域性的連結串列中
132     // 初始化client的事物狀態
133     initClientMultiState(c);
134     return c;
135 }
View Code

client 的屬性中有很多屬性,有一些也許沒有標註到,感興趣的同學可以自行閱讀原始碼。

讀取socket資料到輸入緩衝區

  readQueryFromClient 方法會呼叫 read 方法從 socket 中讀取資料到輸入緩衝區中,然後判斷其大小是否大於系統設定的 client_max_querybuf_len,如果大於,則向 Redis返回錯誤資訊,並關閉 client。將資料讀取到輸入緩衝區後,readQueryFromClient 方法會根據 client 的型別來做不同的處理,如果是普通型別,則直接呼叫 processInputBuffer 來處理;如果是主從客戶端,還需要將命令同步到自己的從伺服器中。也就是說,Redis例項將主例項傳來的命令執行後,繼續將命令同步給自己的從例項。

原始碼如下

  1 /**
  2  * 為了接收客戶端傳來的命令請求, 伺服器要為客戶端套接字關聯命令請求處理器。
  3  *
  4  * readQueryFromClient函式是Redis的命令請求處理器,這個處理器負責從套接字中讀入客戶端傳送的命令請求內容,
  5  * 具體實現為unistd.h/read函式的包裝。
  6  *
  7  * 當一個客戶端通過連線應答處理器成功連線到伺服器之後,
  8  * 伺服器會將客戶端套接字的AE_READABLE事件和命令請求處理器關聯起來,當客戶端向伺服器傳送命令請求的時候,
  9  * 套接字就會產生 AE_READABLE事件,引發命令請求處理器執行,並執行相應的套接字讀入操作,
 10  *
 11  * 在客戶端連線伺服器的整個過程中,伺服器都會一直為客戶端套接字的AE_READABLE事件關聯命令請求處理器。
 12  */
 13 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
 14     //指向之前設定的物件指標
 15     client *c = (client*) privdata;
 16     //readlen:REDIS_IOBUF_LEN
 17     int nread, readlen;
 18     //指示之前已經讀的資料
 19     size_t qblen;
 20     //設定幾個變數
 21     UNUSED(el);
 22     UNUSED(mask);
 23 
 24     //每次想讀的資料長度16K
 25     readlen = PROTO_IOBUF_LEN;
 26     /* If this is a multi bulk request, and we are processing a bulk reply
 27      * that is large enough, try to maximize the probability that the query
 28      * buffer contains exactly the SDS string representing the object, even
 29      * at the risk of requiring more read(2) calls. This way the function
 30      * processMultiBulkBuffer() can avoid copying buffers to create the
 31      * Redis Object representing the argument. */
 32     // 如果是多條請求,根據請求的大小,設定讀入的長度readlen
 33     if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
 34         && c->bulklen >= PROTO_MBULK_BIG_ARG)
 35     {
 36         ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
 37 
 38         /* Note that the 'remaining' variable may be zero in some edge case,
 39          * for example once we resume a blocked client after CLIENT PAUSE. */
 40         if (remaining > 0 && remaining < readlen) readlen = remaining;
 41     }
 42 
 43     //之前緩衝區裡已經存在的資料的長度
 44     qblen = sdslen(c->querybuf);
 45     // 更新緩衝區的峰值
 46     if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
 47     //保證有足夠的空間
 48     c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
 49     // 從 fd 對應的socket中讀取到 client 中的 querybuf 輸入緩衝區
 50     nread = read(fd, c->querybuf+qblen, readlen);
 51     // 讀操作出錯
 52     if (nread == -1) {
 53         if (errno == EAGAIN) {
 54             return;
 55         } else {
 56             // 出錯釋放 client
 57             serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
 58             freeClient(c);
 59             return;
 60         }
 61     } else if (nread == 0) {
 62         // 讀操作完成
 63         // 客戶端主動關閉 connection
 64         serverLog(LL_VERBOSE, "Client closed connection");
 65         freeClient(c);
 66         return;
 67     } else if (c->flags & CLIENT_MASTER) {
 68         /**
 69          * Append the query buffer to the pending (not applied) buffer
 70          * of the master. We'll use this buffer later in order to have a
 71          * copy of the string applied by the last command executed.
 72          *
 73          * 當這個client代表主從的master節點時,將query buffer和 pending_querybuf結合
 74          * 用於主從複製中的命令傳播????
 75          *
 76          * 將查詢緩衝區附加到 master 的掛起(未應用)緩衝區。 稍後我們將使用此緩衝區,
 77          * 以便獲得執行的最後一個命令所應用的字串的副本。
 78          */
 79         c->pending_querybuf = sdscatlen(c->pending_querybuf,
 80                                         c->querybuf+qblen,nread);
 81     }
 82 
 83     // 更新輸入緩衝區的已用大小和未用大小。
 84     sdsIncrLen(c->querybuf,nread);
 85     // 設定最後一次伺服器和client互動的時間
 86     c->lastinteraction = server.unixtime;
 87     // 如果是主節點,則更新複製操作的偏移量
 88     if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
 89     // 更新從網路輸入的位元組數
 90     server.stat_net_input_bytes += nread;
 91     // 如果大於系統配置的最大客戶端快取區大小,也就是配置檔案中的client-query-buffer-limit:1G
 92     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
 93         // 將client資訊轉換為sds
 94         sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
 95 
 96         // 返回錯誤資訊,並且關閉client
 97         bytes = sdscatrepr(bytes,c->querybuf,64);
 98         // 列印到日誌
 99         serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
100         // 釋放空間
101         sdsfree(ci);
102         sdsfree(bytes);
103         freeClient(c);
104         return;
105     }
106 
107     /* Time to process the buffer. If the client is a master we need to
108      * compute the difference between the applied offset before and after
109      * processing the buffer, to understand how much of the replication stream
110      * was actually applied to the master state: this quantity, and its
111      * corresponding part of the replication stream, will be propagated to
112      * the sub-slaves and to the replication backlog. */
113     // 處理client輸入的命令內容
114     processInputBufferAndReplicate(c);
115 }
View Code

  該函式又會呼叫processInputBufferAndReplicate,對輸入資料根據不同的角色做不同的操作,原始碼如下

 1 /**
 2  * This is a wrapper for processInputBuffer that also cares about handling
 3  * the replication forwarding to the sub-slaves, in case the client 'c'
 4  * is flagged as master. Usually you want to call this instead of the
 5  * raw processInputBuffer().
 6  *
 7  * 這是 processInputBuffer 的一個包裝器,它也關心處理複製轉發到子從站,
 8  * 以防客戶端“c”被標記為主站。 通常你想呼叫它而不是原始的 processInputBuffer()。
 9  */
10 void processInputBufferAndReplicate(client *c) {
11     if (!(c->flags & CLIENT_MASTER)) {
12         // processInputBuffer 處理輸入緩衝區,解析獲取命令
13         processInputBuffer(c);
14     } else {
15         // 如果client是master的連線
16         size_t prev_offset = c->reploff;
17         processInputBuffer(c);
18         // 判斷是否同步偏移量發生變化,則通知到後續的slave
19         size_t applied = c->reploff - prev_offset;
20         if (applied) {
21             replicationFeedSlavesFromMasterStream(server.slaves,
22                     c->pending_querybuf, applied);
23             sdsrange(c->pending_querybuf,applied,-1);
24         }
25     }
26 }

解析獲取命令

  processInputBuffer 主要是將輸入緩衝區中的資料解析成對應的命令,根據命令型別是 PROTO_REQ_MULTIBULK 還是 PROTO_REQ_INLINE,來分別呼叫 processInlineBuffer 和 processMultibulkBuffer 方法來解析命令。然後呼叫 processCommand 方法來執行命令。執行成功後,如果是主從客戶端,還需要更新同步偏移量 reploff 屬性,然後重置 client,讓client可以接收一條命令。

  1 /**
  2  * This function is called every time, in the client structure 'c', there is
  3  * more query buffer to process, because we read more data from the socket
  4  * or because a client was blocked and later reactivated, so there could be
  5  * pending query buffer, already representing a full command, to process.
  6  *
  7  * 在客戶端結構“c”中,每次呼叫此函式時,有更多的查詢緩衝區要處理,因為我們從套接字讀取了更多資料,
  8  * 或者因為客戶端被阻塞並稍後重新啟用,因此可能已經有一個要處理完整的命令位待處理的查詢緩衝區。
  9  *
 10  * processInputBuffer 主要是將輸入緩衝區中的資料解析成對應的命令,
 11  * 根據命令型別是 PROTO_REQ_MULTIBULK 還是 PROTO_REQ_INLINE,來分別呼叫 processInlineBuffer 和
 12  * processMultibulkBuffer 方法來解析命令。
 13  *
 14  * 然後呼叫 processCommand 方法來執行命令。執行成功後,如果是主從客戶端,
 15  * 還需要更新同步偏移量 reploff 屬性,然後重置 client,讓client可以接收一條命令。
 16  */
 17 void processInputBuffer(client *c) {
 18     server.current_client = c;
 19 
 20     /* Keep processing while there is something in the input buffer */
 21     /* 當緩衝區中還有資料時就一直處理 */
 22     while(c->qb_pos < sdslen(c->querybuf)) {
 23         /* Return if clients are paused. */
 24         // 如果處於暫停狀態,直接返回
 25         if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
 26 
 27         // 處理 client 的各種狀態
 28 
 29         /**
 30          * Immediately abort if the client is in the middle of something.
 31          * 如果client處於被阻塞狀態,直接返回
 32          */
 33         if (c->flags & CLIENT_BLOCKED) break;
 34 
 35         /**
 36          * Don't process input from the master while there is a busy script
 37          * condition on the slave. We want just to accumulate the replication
 38          * stream (instead of replying -BUSY like we do with other clients) and
 39          * later resume the processing.
 40          * 當從站上有繁忙的指令碼條件時,不要處理來自主站的輸入。
 41          * 我們只想累積複製流(而不是像我們對其他客戶端那樣回覆 -BUSY)並且稍後恢復處理。
 42          */
 43         if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
 44 
 45         /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
 46          * written to the client. Make sure to not let the reply grow after
 47          * this flag has been set (i.e. don't process more commands).
 48          *
 49          * The same applies for clients we want to terminate ASAP.
 50          *
 51          * 一旦回覆寫入客戶端,CLIENT_CLOSE_AFTER_REPLY 將關閉連線。
 52          * 確保在設定此標誌後不要再次回覆(即不要處理更多命令)。 這同樣適用於我們希望儘快終止的客戶。
 53          *
 54          * 如果client處於關閉狀態,則直接返回
 55          */
 56         if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
 57 
 58         /* Determine request type when unknown. */
 59         // 如果是未知的請求型別,則判定請求型別
 60         if (!c->reqtype) {
 61             if (c->querybuf[c->qb_pos] == '*') {
 62                 // 如果是"*"開頭,則是多條請求,是client發來的
 63                 c->reqtype = PROTO_REQ_MULTIBULK;
 64             } else {
 65                 // 否則就是內聯請求,是Telnet發來的
 66                 c->reqtype = PROTO_REQ_INLINE;
 67             }
 68         }
 69 
 70         // 如果是Telnet內聯請求
 71         if (c->reqtype == PROTO_REQ_INLINE) {
 72             // 處理Telnet發來的內聯命令,並建立成物件,儲存在client的引數列表中
 73             if (processInlineBuffer(c) != C_OK) break;
 74         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
 75             // 將client的querybuf中的協議內容轉換為client的引數列表中的物件
 76             if (processMultibulkBuffer(c) != C_OK) break;
 77         } else {
 78             serverPanic("Unknown request type");
 79         }
 80 
 81         /* Multibulk processing could see a <= 0 length. */
 82         // 如果引數為0,則重置client
 83         if (c->argc == 0) {
 84             resetClient(c);
 85         } else {
 86             /* Only reset the client when the command was executed. */
 87             // 只有執行命令成功後才會重置client
 88             if (processCommand(c) == C_OK) {
 89                 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
 90                     /* Update the applied replication offset of our master. */
 91                     c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
 92                 }
 93 
 94                 /**
 95                  * Don't reset the client structure for clients blocked in a
 96                  * module blocking command, so that the reply callback will
 97                  * still be able to access the client argv and argc field.
 98                  * The client will be reset in unblockClientFromModule().
 99                  * 如果當前客戶端是非阻塞的或者當前客戶端的命令是非阻塞的就重置客戶端
100                  */
101                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
102                     resetClient(c);
103             }
104             /* freeMemoryIfNeeded may flush slave output buffers. This may
105              * result into a slave, that may be the active client, to be
106              * freed. */
107             if (server.current_client == NULL) break;
108         }
109     }
110 
111     /* Trim to pos */
112     if (c->qb_pos) {
113         sdsrange(c->querybuf,c->qb_pos,-1);
114         c->qb_pos = 0;
115     }
116 
117     // 執行成功,則將用於崩潰報告的client設定為NULL
118     server.current_client = NULL;
119 }
View Code

解析命令就是將 redis 命令文字資訊,記錄到client的argv/argc屬性中

執行命令

  processCommand 方法會處理很多邏輯,不過大致可以分為三個部分:首先是呼叫 lookupCommand 方法獲得對應的 redisCommand;接著是檢測當前 Redis 是否可以執行該命令;最後是呼叫 call 方法真正執行命令。

processCommand會做如下邏輯處理:

  • 1 如果命令名稱為 quit,則直接返回,並且設定客戶端標誌位。
  • 2 根據 argv[0] 查詢對應的 redisCommand,所有的命令都儲存在命令字典 redisCommandTable 中,根據命令名稱可以獲取對應的命令。
  • 3 進行使用者許可權校驗。
  • 4 如果是叢集模式,處理叢集重定向。當命令傳送者是 master 或者 命令沒有任何 key 的引數時可以不重定向。
  • 5 預防 maxmemory 情況,先嚐試回收一下,如果不行,則返回異常。
  • 6 當此伺服器是 master 時:aof 持久化失敗時,或上一次 bgsave 執行錯誤,且配置 bgsave 引數和 stop_writes_on_bgsave_err;禁止執行寫命令。
  • 7 當此伺服器時master時:如果配置了 repl_min_slaves_to_write,當slave數目小於時,禁止執行寫命令。
  • 8 當時只讀slave時,除了 master 的不接受其他寫命令。
  • 9 當客戶端正在訂閱頻道時,只會執行部分命令。
  • 10 伺服器為slave,但是沒有連線 master 時,只會執行帶有 CMD_STALE 標誌的命令,如 info 等
  • 11 正在載入資料庫時,只會執行帶有 CMD_LOADING 標誌的命令,其餘都會被拒絕。
  • 12 當伺服器因為執行lua指令碼阻塞時,只會執行部分命令,其餘都會拒絕
  • 13 如果是事務命令,則開啟事務,命令進入等待佇列;否則直接執行命令。

函式原始碼

  1 /**
  2  * If this function gets called we already read a whole
  3  * command, arguments are in the client argv/argc fields.
  4  * processCommand() execute the command or prepare the
  5  * server for a bulk read from the client.
  6  * 如果這個函式被呼叫,就表明我們已經讀取了整個命令,引數在客戶端 argv/argc 欄位中
  7  * processCommand() 執行命令或準備伺服器以從客戶端進行批量讀取。
  8  *
  9  * If C_OK is returned the client is still alive and valid and
 10  * other operations can be performed by the caller. Otherwise
 11  * if C_ERR is returned the client was destroyed (i.e. after QUIT).
 12  * 如果返回 C_OK,則客戶端仍處於活動狀態且有效,並且呼叫者可以執行其他操作。
 13  * 否則,如果返回 C_ERR,則客戶端被銷燬(即在 QUIT 之後)。
 14  *
 15  * processCommand 方法會處理很多邏輯,不過大致可以分為三個部分:首先是呼叫 lookupCommand 方法獲得對應的
 16  *      redisCommand;接著是檢測當前 Redis 是否可以執行該命令;最後是呼叫 call 方法真正執行命令。
 17  *
 18  * processCommand會做如下邏輯處理:
 19  *      1 如果命令名稱為 quit,則直接返回,並且設定客戶端標誌位。
 20  *      2 根據 argv[0] 查詢對應的 redisCommand,所有的命令都儲存在命令字典 redisCommandTable 中,根據命令名稱可以獲取對應的命令。
 21  *      3 進行使用者許可權校驗。
 22  *      4 如果是叢集模式,處理叢集重定向。當命令傳送者是 master 或者 命令沒有任何 key 的引數時可以不重定向。
 23  *      5 預防 maxmemory 情況,先嚐試回收一下,如果不行,則返回異常。
 24  *      6 當此伺服器是 master 時:aof 持久化失敗時,或上一次 bgsave 執行錯誤,且配置 bgsave 引數和 stop_writes_on_bgsave_err;禁止執行寫命令。
 25  *      7 當此伺服器時master時:如果配置了 repl_min_slaves_to_write,當slave數目小於時,禁止執行寫命令。
 26  *      8 當時只讀slave時,除了 master 的不接受其他寫命令。
 27  *      9 當客戶端正在訂閱頻道時,只會執行部分命令。
 28  *      10 伺服器為slave,但是沒有連線 master 時,只會執行帶有 CMD_STALE 標誌的命令,如 info 等
 29  *      11 正在載入資料庫時,只會執行帶有 CMD_LOADING 標誌的命令,其餘都會被拒絕。
 30  *      12 當伺服器因為執行lua指令碼阻塞時,只會執行部分命令,其餘都會拒絕
 31  *      13 如果是事務命令,則開啟事務,命令進入等待佇列;否則直接執行命令。
 32  */
 33 int processCommand(client *c) {
 34     /* The QUIT command is handled separately. Normal command procs will
 35      * go through checking for replication and QUIT will cause trouble
 36      * when FORCE_REPLICATION is enabled and would be implemented in
 37      * a regular command proc. */
 38     // 1 處理 quit 命令
 39     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
 40         addReply(c,shared.ok);
 41         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
 42         return C_ERR;
 43     }
 44 
 45     /* Now lookup the command and check ASAP about trivial error conditions
 46      * such as wrong arity, bad command name and so forth. */
 47     /**
 48      * 根據 argv[0] 查詢對應的 command
 49      * 2 命令字典查詢指定命令;所有的命令都儲存在命令字典中 struct redisCommand redisCommandTable[]={}
 50      */
 51     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
 52     if (!c->cmd) {
 53         // 處理未知命令
 54         flagTransaction(c);
 55         sds args = sdsempty();
 56         int i;
 57         for (i=1; i < c->argc && sdslen(args) < 128; i++)
 58             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
 59         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
 60             (char*)c->argv[0]->ptr, args);
 61         sdsfree(args);
 62         return C_OK;
 63     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
 64                (c->argc < -c->cmd->arity)) {
 65         // 處理引數錯誤
 66         flagTransaction(c);
 67         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
 68             c->cmd->name);
 69         return C_OK;
 70     }
 71 
 72     /* Check if the user is authenticated */
 73     // 3 檢查使用者驗證
 74     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
 75     {
 76         flagTransaction(c);
 77         addReply(c,shared.noautherr);
 78         return C_OK;
 79     }
 80 
 81     /* If cluster is enabled perform the cluster redirection here.
 82      * However we don't perform the redirection if:
 83      * 1) The sender of this command is our master.
 84      * 2) The command has no key arguments. */
 85     /**
 86      * 4 如果是叢集模式,處理叢集重定向。當命令傳送者是master或者 命令沒有任何key的引數時可以不重定向
 87      */
 88     if (server.cluster_enabled &&
 89         !(c->flags & CLIENT_MASTER) &&
 90         !(c->flags & CLIENT_LUA &&
 91           server.lua_caller->flags & CLIENT_MASTER) &&
 92         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
 93           c->cmd->proc != execCommand))
 94     {
 95         int hashslot;
 96         int error_code;
 97         // 查詢可以執行的node資訊
 98         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
 99                                         &hashslot,&error_code);
100         if (n == NULL || n != server.cluster->myself) {
101             if (c->cmd->proc == execCommand) {
102                 discardTransaction(c);
103             } else {
104                 flagTransaction(c);
105             }
106             clusterRedirectClient(c,n,hashslot,error_code);
107             return C_OK;
108         }
109     }
110 
111     /* Handle the maxmemory directive.
112      *
113      * First we try to free some memory if possible (if there are volatile
114      * keys in the dataset). If there are not the only thing we can do
115      * is returning an error.
116      *
117      * Note that we do not want to reclaim memory if we are here re-entering
118      * the event loop since there is a busy Lua script running in timeout
119      * condition, to avoid mixing the propagation of scripts with the propagation
120      * of DELs due to eviction. */
121     // 5 處理maxmemory請求,先嚐試回收一下,如果不行,則返回異常
122     if (server.maxmemory && !server.lua_timedout) {
123         int out_of_memory = freeMemoryIfNeeded() == C_ERR;
124         /* freeMemoryIfNeeded may flush slave output buffers. This may result
125          * into a slave, that may be the active client, to be freed. */
126         if (server.current_client == NULL) return C_ERR;
127 
128         /* It was impossible to free enough memory, and the command the client
129          * is trying to execute is denied during OOM conditions or the client
130          * is in MULTI/EXEC context? Error. */
131         if (out_of_memory &&
132             (c->cmd->flags & CMD_DENYOOM ||
133              (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
134             flagTransaction(c);
135             addReply(c, shared.oomerr);
136             return C_OK;
137         }
138     }
139 
140     /**
141      * Don't accept write commands if there are problems persisting on disk
142      * and if this is a master instance.
143      * 如果出現AOF或者RDB錯誤,這進位制寫入
144      */
145     int deny_write_type = writeCommandsDeniedByDiskError();
146     if (deny_write_type != DISK_ERROR_TYPE_NONE &&
147         server.masterhost == NULL &&
148         (c->cmd->flags & CMD_WRITE ||
149          c->cmd->proc == pingCommand))
150     {
151         flagTransaction(c);
152         if (deny_write_type == DISK_ERROR_TYPE_RDB)
153             addReply(c, shared.bgsaveerr);
154         else
155             addReplySds(c,
156                 sdscatprintf(sdsempty(),
157                 "-MISCONF Errors writing to the AOF file: %s\r\n",
158                 strerror(server.aof_last_write_errno)));
159         return C_OK;
160     }
161 
162     /* Don't accept write commands if there are not enough good slaves and
163      * user configured the min-slaves-to-write option. */
164     /**
165      * 7 當此伺服器是master時:如果配置了repl_min_slaves_to_write,
166      * 當slave數目小於時,禁止執行寫命令
167      */
168     if (server.masterhost == NULL &&
169         server.repl_min_slaves_to_write &&
170         server.repl_min_slaves_max_lag &&
171         c->cmd->flags & CMD_WRITE &&
172         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
173     {
174         flagTransaction(c);
175         addReply(c, shared.noreplicaserr);
176         return C_OK;
177     }
178 
179     /**
180      * Don't accept write commands if this is a read only slave. But
181      * accept write commands if this is our master.
182      * 如果這是隻讀從站,則不接受寫入命令。 但是如果這是我們的主人,請接受寫入命令。
183      * 因為一個從站可能是另一個從站的主站
184      */
185     /**
186      * 8 當是只讀slave時,除了master的不接受其他寫命令
187      */
188     if (server.masterhost && server.repl_slave_ro &&
189         !(c->flags & CLIENT_MASTER) &&
190         c->cmd->flags & CMD_WRITE)
191     {
192         addReply(c, shared.roslaveerr);
193         return C_OK;
194     }
195 
196     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
197     /**
198      * 9 當客戶端正在訂閱頻道時,只會執行以下命令
199      */
200     if (c->flags & CLIENT_PUBSUB &&
201         c->cmd->proc != pingCommand &&
202         c->cmd->proc != subscribeCommand &&
203         c->cmd->proc != unsubscribeCommand &&
204         c->cmd->proc != psubscribeCommand &&
205         c->cmd->proc != punsubscribeCommand) {
206         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
207         return C_OK;
208     }
209 
210     /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
211      * when slave-serve-stale-data is no and we are a slave with a broken
212      * link with master. */
213     /**
214      * 10 伺服器為slave,但沒有正確連線master時,只會執行帶有CMD_STALE標誌的命令,如info等
215      */
216     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
217         server.repl_serve_stale_data == 0 &&
218         !(c->cmd->flags & CMD_STALE))
219     {
220         flagTransaction(c);
221         addReply(c, shared.masterdownerr);
222         return C_OK;
223     }
224 
225     /* Loading DB? Return an error if the command has not the
226      * CMD_LOADING flag. */
227     /**
228      * 11 正在載入資料庫時,只會執行帶有CMD_LOADING標誌的命令,其餘都會被拒絕
229      */
230     if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
231         addReply(c, shared.loadingerr);
232         return C_OK;
233     }
234 
235     /* Lua script too slow? Only allow a limited number of commands. */
236     /**
237      * 12 當伺服器因為執行lua指令碼阻塞時,只會執行以下幾個命令,其餘都會拒絕
238      */
239     if (server.lua_timedout &&
240           c->cmd->proc != authCommand &&
241           c->cmd->proc != replconfCommand &&
242         !(c->cmd->proc == shutdownCommand &&
243           c->argc == 2 &&
244           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
245         !(c->cmd->proc == scriptCommand &&
246           c->argc == 2 &&
247           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
248     {
249         flagTransaction(c);
250         addReply(c, shared.slowscripterr);
251         return C_OK;
252     }
253 
254     /* Exec the command */
255     /**
256      * 13 開始執行命令
257      */
258     if (c->flags & CLIENT_MULTI &&
259         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
260         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
261     {
262         /**
263          * 開啟了事務,命令只會入佇列
264          */
265         queueMultiCommand(c);
266         addReply(c,shared.queued);
267     } else {
268         /**
269          * 直接執行命令
270          */
271         call(c,CMD_CALL_FULL);
272         c->woff = server.master_repl_offset;
273         if (listLength(server.ready_keys))
274             handleClientsBlockedOnKeys();
275     }
276     return C_OK;
277 }
View Code

call 方法是 Redis 中執行命令的通用方法,它會處理通用的執行命令的前置和後續操作。

  • 如果有監視器 monitor,則需要將命令傳送給監視器。
  • 呼叫 redisCommand 的proc 方法,執行對應具體的命令邏輯。
  • 如果開啟了 CMD_CALL_SLOWLOG,則需要記錄慢查詢日誌
  • 如果開啟了 CMD_CALL_STATS,則需要記錄一些統計資訊
  • 如果開啟了 CMD_CALL_PROPAGATE,則當 dirty大於0時,需要呼叫 propagate 方法來進行命令傳播。

命令傳播就是將命令寫入 repl-backlog-buffer 緩衝中,併發送給各個從伺服器中。

call函式原始碼如下

  1 /* Call() is the core of Redis execution of a command.
  2  *
  3  * The following flags can be passed:
  4  * CMD_CALL_NONE        No flags.
  5  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
  6  * CMD_CALL_STATS       Populate command stats.
  7  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
  8  *                          or if the client flags are forcing propagation.
  9  * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
 10  *                          or if the client flags are forcing propagation.
 11  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
 12  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
 13  *
 14  * The exact propagation behavior depends on the client flags.
 15  * Specifically:
 16  *
 17  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
 18  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
 19  *    in the call flags, then the command is propagated even if the
 20  *    dataset was not affected by the command.
 21  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
 22  *    are set, the propagation into AOF or to slaves is not performed even
 23  *    if the command modified the dataset.
 24  *
 25  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
 26  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
 27  * slaves propagation will never occur.
 28  *
 29  * Client flags are modified by the implementation of a given command
 30  * using the following API:
 31  *
 32  * forceCommandPropagation(client *c, int flags);
 33  * preventCommandPropagation(client *c);
 34  * preventCommandAOF(client *c);
 35  * preventCommandReplication(client *c);
 36  *
 37  * call 方法是 Redis 中執行命令的通用方法,它會處理通用的執行命令的前置和後續操作。
 38  *
 39  * 執行client中持有的 redisCommand 命令
 40  *
 41  */
 42 void call(client *c, int flags) {
 43     long long dirty, start, duration;
 44     /**
 45      * dirty記錄資料庫修改次數;start記錄命令開始執行時間us;duration記錄命令執行花費時間
 46      */
 47     int client_old_flags = c->flags;
 48     struct redisCommand *real_cmd = c->cmd;
 49 
 50     /**
 51      * Sent the command to clients in MONITOR mode, only if the commands are
 52      * not generated from reading an AOF.
 53      * 有監視器的話,需要將不是從AOF獲取的命令會發送給監視器。當然,這裡會消耗時間
 54      */
 55     if (listLength(server.monitors) &&
 56         !server.loading &&
 57         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
 58     {
 59         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
 60     }
 61 
 62     /* Initialization: clear the flags that must be set by the command on
 63      * demand, and initialize the array for additional commands propagation. */
 64     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
 65     redisOpArray prev_also_propagate = server.also_propagate;
 66     redisOpArrayInit(&server.also_propagate);
 67 
 68     /* Call the command. */
 69     dirty = server.dirty;
 70     start = ustime();
 71     // 處理命令,呼叫命令處理函式
 72     c->cmd->proc(c);
 73     duration = ustime()-start;
 74     dirty = server.dirty-dirty;
 75     if (dirty < 0) dirty = 0;
 76 
 77     /* When EVAL is called loading the AOF we don't want commands called
 78      * from Lua to go into the slowlog or to populate statistics. */
 79     // Lua 指令碼的一些特殊處理
 80     if (server.loading && c->flags & CLIENT_LUA)
 81         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
 82 
 83     /* If the caller is Lua, we want to force the EVAL caller to propagate
 84      * the script if the command flag or client flag are forcing the
 85      * propagation. */
 86     if (c->flags & CLIENT_LUA && server.lua_caller) {
 87         if (c->flags & CLIENT_FORCE_REPL)
 88             server.lua_caller->flags |= CLIENT_FORCE_REPL;
 89         if (c->flags & CLIENT_FORCE_AOF)
 90             server.lua_caller->flags |= CLIENT_FORCE_AOF;
 91     }
 92 
 93     /**
 94      * Log the command into the Slow log if needed, and populate the
 95      * per-command statistics that we show in INFO commandstats.
 96      * 如果開啟了 CMD_CALL_SLOWLOG,則需要記錄慢查詢日誌
 97      */
 98     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
 99         char *latency_event = (c->cmd->flags & CMD_FAST) ?
100                               "fast-command" : "command";
101         latencyAddSampleIfNeeded(latency_event,duration/1000);
102         slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
103     }
104     /**
105      * CMD_CALL_STATS 表示要統計
106      *
107      * 如果開啟了 CMD_CALL_STATS,則需要記錄一些統計資訊
108      */
109     if (flags & CMD_CALL_STATS) {
110         /* use the real command that was executed (cmd and lastamc) may be
111          * different, in case of MULTI-EXEC or re-written commands such as
112          * EXPIRE, GEOADD, etc. */
113         real_cmd->microseconds += duration;
114         real_cmd->calls++;
115     }
116 
117     /**
118      * Propagate the command into the AOF and replication link
119      * 如果開啟了 CMD_CALL_PROPAGATE,則當 dirty大於0時,需要呼叫 propagate 方法來進行命令傳播
120      * CMD_CALL_PROPAGATE表示要進行廣播命令
121      */
122     if (flags & CMD_CALL_PROPAGATE &&
123         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
124     {
125         int propagate_flags = PROPAGATE_NONE;
126 
127         /**
128          * Check if the command operated changes in the data set. If so
129          * set for replication / AOF propagation.
130          * dirty大於0時,需要廣播命令給slave和aof
131          */
132         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
133 
134         /* If the client forced AOF / replication of the command, set
135          * the flags regardless of the command effects on the data set. */
136         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
137         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
138 
139         /* However prevent AOF / replication propagation if the command
140          * implementations called preventCommandPropagation() or similar,
141          * or if we don't have the call() flags to do so. */
142         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
143             !(flags & CMD_CALL_PROPAGATE_REPL))
144                 propagate_flags &= ~PROPAGATE_REPL;
145         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
146             !(flags & CMD_CALL_PROPAGATE_AOF))
147                 propagate_flags &= ~PROPAGATE_AOF;
148 
149         /**
150          * Call propagate() only if at least one of AOF / replication
151          * propagation is needed. Note that modules commands handle replication
152          * in an explicit way, so we never replicate them automatically.
153          * 僅當至少需要 AOF / 複製傳播之一時才呼叫傳播()。
154          * 請注意,模組命令以顯式方式處理複製,因此我們從不自動複製它們。
155          *
156          * 廣播命令,寫如aof,傳送命令到slave
157          * 也就是傳說中的傳播命令
158          */
159         if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
160             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
161     }
162 
163     /* Restore the old replication flags, since call() can be executed
164      * recursively. */
165     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
166     c->flags |= client_old_flags &
167         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
168 
169     /* Handle the alsoPropagate() API to handle commands that want to propagate
170      * multiple separated commands. Note that alsoPropagate() is not affected
171      * by CLIENT_PREVENT_PROP flag. */
172     if (server.also_propagate.numops) {
173         int j;
174         redisOp *rop;
175 
176         if (flags & CMD_CALL_PROPAGATE) {
177             for (j = 0; j < server.also_propagate.numops; j++) {
178                 rop = &server.also_propagate.ops[j];
179                 int target = rop->target;
180                 /* Whatever the command wish is, we honor the call() flags. */
181                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
182                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
183                 if (target)
184                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
185             }
186         }
187         redisOpArrayFree(&server.also_propagate);
188     }
189     server.also_propagate = prev_also_propagate;
190     server.stat_numcommands++;
191 }
View Code

下篇

  在上面瞭解 Redis 命令執行的整體流程,然後細緻分析了從 Redis 啟動到建立 socket 連線,再到讀取 socket 資料到輸入緩衝區,解析命令,執行命令等過程的原理和實現細節。接下來,我們來具體看一下 set 和 get 命令的實現細節和如何將命令結果通過輸出緩衝區和 socket 傳送給 Redis 客戶端。

set 和 get 命令具體實現

  上面講到 processCommand 方法會從輸入緩衝區中解析出對應的 redisCommand,然後呼叫 call 方法執行解析出來的 redisCommand的 proc 方法。不同命令的的 proc 方法是不同的,比如說名為 set 的 redisCommand 的 proc 是 setCommand 方法,而 get 的則是 getCommand 方法。通過這種形式實現多型策略。

 1 void call(client *c, int flags) {
 2     ....
 3     c->cmd->proc(c);
 4     ....
 5 }
 6 // redisCommand結構體
 7 struct redisCommand {
 8     char *name;
 9     // 對應方法的函式正規化
10     redisCommandProc *proc;
11     .... // 其他定義
12 };
13 // 使用 typedef 定義的別名
14 typedef void redisCommandProc(client *c);
15 // 不同的命令,呼叫不同的方法。
16 struct redisCommand redisCommandTable[] = {
17     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
18     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
19     {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
20     .... // 所有的 redis 命令都有
21 }

命令結構

setCommand 會判斷set命令是否攜帶了nx、xx、ex或者px等可選引數,然後呼叫setGenericCommand命令。我們直接來看 setGenericCommand 方法。

setGenericCommand 方法的處理邏輯如下所示:

  • 首先判斷 set 的型別是 set_nx 還是 set_xx,如果是 nx 並且 key 已經存在則直接返回;如果是 xx 並且 key 不存在則直接返回。
  • 呼叫 setKey 方法將鍵值新增到對應的 Redis 資料庫中。
  • 如果有過期時間,則呼叫 setExpire 將設定過期時間
  • 進行鍵空間通知
  • 返回對應的值給客戶端。
 1 #define OBJ_SET_NO_FLAGS 0
 2 #define OBJ_SET_NX (1<<0)     /* Set if key not exists. */
 3 #define OBJ_SET_XX (1<<1)     /* Set if key exists. */
 4 #define OBJ_SET_EX (1<<2)     /* Set if time in seconds is given */
 5 #define OBJ_SET_PX (1<<3)     /* Set if time in ms in given */
 6 
 7 void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
 8     long long milliseconds = 0; /* initialized to avoid any harmness warning */
 9 
10     /**
11      * 設定了過期時間;expire是robj型別,獲取整數值
12      */
13     if (expire) {
14         if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
15             return;
16         if (milliseconds <= 0) {
17             addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
18             return;
19         }
20         if (unit == UNIT_SECONDS) milliseconds *= 1000;
21     }
22 
23     /**
24      * NX,key存在時直接返回;XX,key不存在時直接返回
25      * lookupKeyWrite 是在對應的資料庫中尋找鍵值是否存在
26      */
27     if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
28         (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
29     {
30         addReply(c, abort_reply ? abort_reply : shared.nullbulk);
31         return;
32     }
33     /**
34      * 新增到資料字典
35      */
36     setKey(c->db,key,val);
37     server.dirty++;
38     /**
39      * 過期時間新增到過期字典
40      */
41     if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
42     /**
43      * 鍵空間通知
44      */
45     notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
46     if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
47         "expire",key,c->db->id);
48     /**
49      * 返回值,addReply 在 get 命令時再具體講解
50      */
51     addReply(c, ok_reply ? ok_reply : shared.ok);
52 }

具體 setKey 和 setExpire 的方法實現我們這裡就不細講,其實就是將鍵值新增到db的 dict 資料雜湊表中,將鍵和過期時間新增到 expires 雜湊表中,如下圖所示。

  接下來看 getCommand 的具體實現,同樣的,它底層會呼叫 getGenericCommand 方法。getGenericCommand 方法會呼叫 lookupKeyReadOrReply 來從 dict 資料雜湊表中查詢對應的 key值。如果找不到,則直接返回 C_OK;如果找到了,則根據值的型別,呼叫 addReply 或者 addReplyBulk 方法將值新增到輸出緩衝區中。

 1 int getGenericCommand(client *c) {
 2     robj *o;
 3 
 4     // 呼叫 lookupKeyReadOrReply 從資料字典中查詢對應的鍵
 5     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
 6         return C_OK;
 7 
 8     // 如果是string型別,呼叫 addReply 單行返回。如果是其他物件型別,則呼叫 addReplyBulk
 9     if (o->type != OBJ_STRING) {
10         addReply(c,shared.wrongtypeerr);
11         return C_ERR;
12     } else {
13         addReplyBulk(c,o);
14         return C_OK;
15     }
16 }

  lookupKeyReadWithFlags 會從 redisDb 中查詢對應的鍵值對,它首先會呼叫 expireIfNeeded判斷鍵是否過期並且需要刪除,如果為過期,則呼叫 lookupKey 方法從 dict 雜湊表中查詢並返回。具體解釋可以看程式碼中的詳細註釋

 1 /**
 2  * Lookup a key for read operations, or return NULL if the key is not found
 3  * in the specified DB.
 4  *
 5  * As a side effect of calling this function:
 6  * 1. A key gets expired if it reached it's TTL.
 7  * 2. The key last access time is updated.
 8  * 3. The global keys hits/misses stats are updated (reported in INFO).
 9  *
10  * This API should not be used when we write to the key after obtaining
11  * the object linked to the key, but only for read only operations.
12  *
13  * Flags change the behavior of this command:
14  *
15  *  LOOKUP_NONE (or zero): no special flags are passed.
16  *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
17  *
18  * Note: this function also returns NULL if the key is logically expired
19  * but still existing, in case this is a slave, since this API is called only
20  * for read operations. Even if the key expiry is master-driven, we can
21  * correctly report a key is expired on slaves even if the master is lagging
22  * expiring our key via DELs in the replication link.
23  * 查詢key的讀操作,如果key找不到或者已經邏輯上過期返回 NULL,有一些副作用
24  *   1 如果key到達過期時間,它會被裝置為過期,並且刪除
25  *   2 更新key的最近訪問時間
26  *   3 更新全域性快取擊中概率
27  * flags 有兩個值: LOOKUP_NONE 一般都是這個;LOOKUP_NOTOUCH 不修改最近訪問時間
28  */
29 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
30     robj *val;
31 
32     // 檢查鍵是否過期
33     if (expireIfNeeded(db,key) == 1) {
34         // master和 slave 對這種情況的特殊處理
35         /* Key expired. If we are in the context of a master, expireIfNeeded()
36          * returns 0 only when the key does not exist at all, so it's safe
37          * to return NULL ASAP. */
38         if (server.masterhost == NULL) {
39             server.stat_keyspace_misses++;
40             return NULL;
41         }
42 
43         /* However if we are in the context of a slave, expireIfNeeded() will
44          * not really try to expire the key, it only returns information
45          * about the "logical" status of the key: key expiring is up to the
46          * master in order to have a consistent view of master's data set.
47          *
48          * However, if the command caller is not the master, and as additional
49          * safety measure, the command invoked is a read-only command, we can
50          * safely return NULL here, and provide a more consistent behavior
51          * to clients accessign expired values in a read-only fashion, that
52          * will say the key as non existing.
53          *
54          * Notably this covers GETs when slaves are used to scale reads. */
55         if (server.current_client &&
56             server.current_client != server.master &&
57             server.current_client->cmd &&
58             server.current_client->cmd->flags & CMD_READONLY)
59         {
60             server.stat_keyspace_misses++;
61             return NULL;
62         }
63     }
64     // 查詢鍵值字典
65     val = lookupKey(db,key,flags);
66     // 更新全域性快取命中率
67     if (val == NULL)
68         server.stat_keyspace_misses++;
69     else
70         server.stat_keyspace_hits++;
71     return val;
72 }

  Redis 在呼叫查詢鍵值系列方法前都會先呼叫 expireIfNeeded 來判斷鍵是否過期,然後根據 Redis 是否配置了懶刪除來進行同步刪除或者非同步刪除。

在判斷鍵釋放過期的邏輯中有兩個特殊情況:

  • 如果當前 Redis 是主從結構中的從例項,則只判斷鍵是否過期,不直接對鍵進行刪除,而是要等待主例項傳送過來的刪除命令後再進行刪除。如果當前 Redis 是主例項,則呼叫 propagateExpire 來傳播過期指令。
  • 如果當前正在進行 Lua 指令碼執行,因為其原子性和事務性,整個執行過期中時間都按照其開始執行的那一刻計算,也就是說lua執行時未過期的鍵,在它整個執行過程中也都不會過期。

函式原始碼

 1 /* Check if the key is expired. */
 2 int keyIsExpired(redisDb *db, robj *key) {
 3     // 獲取鍵的過期時間
 4     mstime_t when = getExpire(db,key);
 5 
 6     // 鍵沒有過期時間
 7     if (when < 0) return 0; /* No expire for this key */
 8 
 9     /* Don't expire anything while loading. It will be done later. */
10     // 例項正在從硬碟 laod 資料,比如說 RDB 或者 AOF
11     if (server.loading) return 0;
12 
13     /**
14      * If we are in the context of a Lua script, we pretend that time is
15      * blocked to when the Lua script started. This way a key can expire
16      * only the first time it is accessed and not in the middle of the
17      * script execution, making propagation to slaves / AOF consistent.
18      * See issue #1525 on Github for more information.
19      * 當執行lua指令碼時,只有鍵在lua一開始執行時
20      * 就到了過期時間才算過期,否則在lua執行過程中不算失效
21      */
22     mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
23 
24     return now > when;
25 }
26 
27 /**
28  * This function is called when we are going to perform some operation
29  * in a given key, but such key may be already logically expired even if
30  * it still exists in the database. The main way this function is called
31  * is via lookupKey*() family of functions.
32  *
33  * The behavior of the function depends on the replication role of the
34  * instance, because slave instances do not expire keys, they wait
35  * for DELs from the master for consistency matters. However even
36  * slaves will try to have a coherent return value for the function,
37  * so that read commands executed in the slave side will be able to
38  * behave like if the key is expired even if still present (because the
39  * master has yet to propagate the DEL).
40  *
41  * In masters as a side effect of finding a key which is expired, such
42  * key will be evicted from the database. Also this may trigger the
43  * propagation of a DEL/UNLINK command in AOF / replication stream.
44  *
45  * The return value of the function is 0 if the key is still valid,
46  * otherwise the function returns 1 if the key is expired.
47  * 在呼叫 lookupKey*系列方法前呼叫該方法。
48  * 如果是slave:
49  *  slave 並不主動過期刪除key,但是返回值仍然會返回鍵已經被刪除。
50  *  master 如果key過期了,會主動刪除過期鍵,並且觸發 AOF 和同步操作。
51  * 返回值為0表示鍵仍然有效,否則返回1
52  */
53 int expireIfNeeded(redisDb *db, robj *key) {
54     //KEY是否過期
55     if (!keyIsExpired(db,key)) return 0;
56 
57     /**
58      * If we are running in the context of a slave, instead of
59      * evicting the expired key from the database, we return ASAP:
60      * the slave key expiration is controlled by the master that will
61      * send us synthesized DEL operations for expired keys.
62      *
63      * Still we try to return the right information to the caller,
64      * that is, 0 if we think the key should be still valid, 1 if
65      * we think the key is expired at this time.
66      * 當本例項是slave時,過期鍵的刪除由master傳送過來的
67      * del 指令控制。但是這個函式還是將正確的資訊返回給呼叫者。
68      */
69     if (server.masterhost != NULL) return 1;
70 
71     /* Delete the key */
72     // 程式碼到這裡,說明鍵已經過期,而且需要被刪除
73     server.stat_expiredkeys++;
74     // 命令傳播,到 slave 和 AOF
75     propagateExpire(db,key,server.lazyfree_lazy_expire);
76     // 鍵空間通知使得客戶端可以通過訂閱頻道或模式, 來接收那些以某種方式改動了 Redis 資料集的事件。
77     notifyKeyspaceEvent(NOTIFY_EXPIRED,
78         "expired",key,db->id);
79     // 如果是惰性刪除,呼叫dbAsyncDelete,否則呼叫 dbSyncDelete
80     return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
81                                          dbSyncDelete(db,key);
82 }

  lookupKey 方法則是通過 dictFind 方法從 redisDb 的 dict 雜湊表中查詢鍵值,如果能找到,則根據 redis 的 maxmemory_policy 策略來判斷是更新 lru 的最近訪問時間,還是呼叫 updateFU 方法更新其他指標,這些指標可以在後續記憶體不足時對鍵值進行回收。

 1 /**
 2  * Low level key lookup API, not actually called directly from commands
 3  * implementations that should instead rely on lookupKeyRead(),
 4  * lookupKeyWrite() and lookupKeyReadWithFlags().
 5  * lookupKey 方法則是通過 dictFind 方法從 redisDb 的 dict 雜湊表中查詢鍵值,
 6  * 如果能找到,則根據 redis 的 maxmemory_policy 策略來判斷是更新 lru 的最近訪問時間,
 7  * 還是呼叫 updateFU 方法更新其他指標,這些指標可以在後續記憶體不足時對鍵值進行回收。
 8  */
 9 robj *lookupKey(redisDb *db, robj *key, int flags) {
10     // dictFind 根據 key 獲取字典的entry
11     dictEntry *de = dictFind(db->dict,key->ptr);
12     if (de) {
13         // 獲取 value
14         robj *val = dictGetVal(de);
15 
16         /**
17          * Update the access time for the ageing algorithm.
18          * Don't do it if we have a saving child, as this will trigger
19          * a copy on write madness.
20          * 當處於 rdb aof 子程序複製階段或者 flags 不是 LOOKUP_NOTOUCH
21          */
22         if (server.rdb_child_pid == -1 &&
23             server.aof_child_pid == -1 &&
24             !(flags & LOOKUP_NOTOUCH))
25         {
26             // 如果是 MAXMEMORY_FLAG_LFU 則進行相應操作
27             if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
28                 updateLFU(val);
29             } else {
30                 // 更新最近訪問時間
31                 val->lru = LRU_CLOCK();
32             }
33         }
34         return val;
35     } else {
36         return NULL;
37     }
38 }

將命令結果寫入輸出緩衝區

在所有的 redisCommand 執行的最後,一般都會呼叫 addReply 方法進行結果返回,我們的分析也來到了 Redis 命令執行的返回資料階段。

addReply 方法做了兩件事情:

  • prepareClientToWrite 判斷是否需要返回資料,並且將當前 client 新增到等待寫返回資料佇列中。
  • 呼叫 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩衝區中,等待寫入 socekt。

函式原始碼

 1 /**
 2  * Add the object 'obj' string representation to the client output buffer.
 3  * addReply 方法做了兩件事情:
 4  *      prepareClientToWrite 判斷是否需要返回資料,並且將當前 client 新增到等待寫返回資料佇列中。
 5  *      呼叫 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩衝區中,等待寫入 socekt。
 6  */
 7 void addReply(client *c, robj *obj) {
 8     if (prepareClientToWrite(c) != C_OK) return;
 9 
10     if (sdsEncodedObject(obj)) {
11         /**
12          * 需要將響應內容新增到output buffer中。總體思路是,先嚐試向固定buffer新增,
13          * 新增失敗的話,在嘗試新增到響應連結串列
14          */
15         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
16             _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
17     } else if (obj->encoding == OBJ_ENCODING_INT) {// 特殊情況的優化
18         /**
19          * For integer encoded strings we just convert it into a string
20          * using our optimized function, and attach the resulting string
21          * to the output buffer.
22          * 對於整數編碼的字串,我們只需使用我們優化的函式將其轉換為字串,並將結果字串附加到輸出緩衝區。
23          */
24         char buf[32];
25         size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
26         if (_addReplyToBuffer(c,buf,len) != C_OK)
27             _addReplyStringToList(c,buf,len);
28     } else {
29         serverPanic("Wrong obj->encoding in addReply()");
30     }
31 }

prepareClientToWrite 首先判斷了當前 client是否需要返回資料:

  • Lua 指令碼執行的 client 則需要返回值;
  • 如果客戶端傳送來 REPLY OFF 或者 SKIP 命令,則不需要返回值;
  • 如果是主從複製時的主例項 client,則不需要返回值;
  • 當前是在 AOF loading 狀態的假 client,則不需要返回值。

接著如果這個 client 還未處於延遲等待寫入 (CLIENT_PENDING_WRITE)的狀態,則將其設定為該狀態,並將其加入到 Redis 的等待寫入返回值客戶端佇列中,也就是 clients_pending_write佇列。

 1 /**
 2  * This function is called every time we are going to transmit new data
 3  * to the client. The behavior is the following:
 4  * 每當我們呼叫這個函式就代表我們將要傳送新的資料給客戶端,執行步驟如下:
 5  *
 6  * If the client should receive new data (normal clients will) the function
 7  * returns C_OK, and make sure to install the write handler in our event
 8  * loop so that when the socket is writable new data gets written.
 9  * 如果客戶端應該接收新資料(普通客戶端會),該函式返回 C_OK,並確保在我們的事件迴圈中安裝寫入處理程式,
10  * 以便在套接字可寫時寫入新資料。
11  *
12  * If the client should not receive new data, because it is a fake client
13  * (used to load AOF in memory), a master or because the setup of the write
14  * handler failed, the function returns C_ERR.
15  * 如果客戶端不應該接收新資料,因為它是一個假客戶端(用於將 AOF 載入到記憶體中)、
16  * 一個 master 或者因為寫處理程式的設定失敗,該函式返回 C_ERR。
17  *
18  * The function may return C_OK without actually installing the write
19  * event handler in the following cases:
20  * 在以下情況下,該函式可能會在不沒有安裝寫入事件處理程式的情況下返回 C_OK
21  *
22  * 1) The event handler should already be installed since the output buffer
23  *    already contains something.
24  *    事件處理程式應該已經安裝,因為輸出緩衝區已經包含一些東西。
25  * 2) The client is a slave but not yet online, so we want to just accumulate
26  *    writes in the buffer but not actually sending them yet.
27  *    客戶端是一個從裝置但尚未線上,因此我們只想在緩衝區中累積寫入但尚未實際傳送它們。
28  *
29  * Typically gets called every time a reply is built, before adding more
30  * data to the clients output buffers. If the function returns C_ERR no
31  * data should be appended to the output buffers.
32  * 通常在每次構建回覆時呼叫,然後向客戶端輸出緩衝區新增更多資料。 如果函式返回 C_ERR,則不應將資料附加到輸出緩衝區。
33  *
34  * prepareClientToWrite 首先判斷了當前 client是否需要返回資料:
35  *      Lua 指令碼執行的 client 則需要返回值;
36  *      如果客戶端傳送來 REPLY OFF 或者 SKIP 命令,則不需要返回值;
37  *      如果是主從複製時的主例項 client,則不需要返回值;
38  *      當前是在 AOF loading 狀態的假 client,則不需要返回值。
39  * 接著如果這個 client 還未處於延遲等待寫入 (CLIENT_PENDING_WRITE)的狀態,則將其設定為該狀態,
40  * 並將其加入到 Redis 的等待寫入返回值客戶端佇列中,也就是 clients_pending_write佇列。
41  */
42 int prepareClientToWrite(client *c) {
43     /**
44      * If it's the Lua client we always return ok without installing any
45      * handler since there is no socket at all.
46      * 如果是 lua client 則直接OK
47      */
48     if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
49 
50     /**
51      * CLIENT REPLY OFF / SKIP handling: don't send replies.
52      * 客戶端發來過 REPLY OFF 或者 SKIP 命令,不需要傳送返回值
53      */
54     if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
55 
56     /**
57      * Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
58      * is set.
59      * master 作為client 向 slave 傳送命令,不需要接收返回值
60      */
61     if ((c->flags & CLIENT_MASTER) &&
62         !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
63 
64     // AOF loading 時的假client 不需要返回值
65     if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
66 
67     /**
68      * Schedule the client to write the output buffers to the socket, unless
69      * it should already be setup to do so (it has already pending data).
70      * 將client加入到等待寫入返回值佇列中,下次事件週期會進行返回值寫入。
71      */
72     if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
73 
74     /**
75      * Authorize the caller
76      * 表示已經在排隊,進行返回資料
77      */
78     return C_OK;
79 }

  Redis 將儲存等待返回的響應資料的空間,也就是輸出緩衝區分成兩部分,一個固定大小的 buffer 和一個響應內容資料的連結串列在連結串列為空並且 buffer 有足夠空間時,則將響應新增到 buffer 中。如果 buffer 滿了則建立一個節點追加到連結串列上。_addReplyToBuffer 和 _addReplyObjectToList 就是分別向這兩個空間寫資料的方法。

  固定buffer和響應連結串列,整體上構成了一個佇列。這麼組織的好處是,既可以節省記憶體,不需一開始預先分配大塊記憶體,並且可以避免頻繁分配、回收記憶體上面就是響應內容寫入輸出緩衝區的過程,下面看一下將資料從輸出緩衝區寫入 socket 的過程。

  prepareClientToWrite 函式,將客戶端加入到了Redis 的等待寫入返回值客戶端佇列中,也就是 clients_pending_write 佇列。請求處理的事件處理邏輯就結束了,等待 Redis 下一次事件迴圈處理時,將響應從輸出緩衝區寫入到 socket 中。

將命令返回值從輸出緩衝區寫入 socket

  Redis 在兩次事件迴圈之間會呼叫 beforeSleep 方法處理一些事情,而對 clients_pending_write 列表的處理就在其中。下面的 aeMain 方法就是 Redis 事件迴圈的主邏輯,可以看到每次迴圈時都會呼叫 beforesleep 方法。

 1 // 事件輪詢的主函式
 2 void aeMain(aeEventLoop *eventLoop) {
 3     eventLoop->stop = 0;
 4     // 一直處理事件
 5     while (!eventLoop->stop) {
 6         /**
 7          * 執行處理事件之前的函式,實際上就是server.c中的void beforeSleep(struct aeEventLoop *eventLoop)函式
 8          * 如果有需要在事件處理前執行的函式,那麼執行它
 9          */
10         if (eventLoop->beforesleep != NULL)
11             eventLoop->beforesleep(eventLoop);
12         //處理到時的時間事件和就緒的檔案事件
13         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
14     }
15 }

  beforeSleep 函式會呼叫 handleClientsWithPendingWrites 函式來處理 clients_pending_write 列表。handleClientsWithPendingWrites 方法會遍歷 clients_pending_write 列表,對於每個 client 都會先呼叫 writeToClient 方法來嘗試將返回資料從輸出快取區寫入到 socekt中,如果還未寫完,則只能呼叫 aeCreateFileEvent 方法來註冊一個寫資料事件處理器 sendReplyToClient,等待 Redis 事件機制的再次呼叫。

  這樣的好處是對於返回資料較少的客戶端,不需要麻煩的註冊寫資料事件,等待事件觸發再寫資料到 socket,而是在下一次事件迴圈週期就直接將資料寫到 socket中,加快了資料返回的響應速度但是從這裡也會發現,如果 clients_pending_write 佇列過長,則處理時間也會很久,阻塞正常的事件響應處理,導致 Redis 後續命令延時增加

 1 /**
 2  * This function is called just before entering the event loop, in the hope
 3  * we can just write the replies to the client output buffer without any
 4  * need to use a syscall in order to install the writable event handler,
 5  * get it called, and so forth.
 6  * 直接將返回值寫到client的輸出緩衝區中,不需要進行系統呼叫,也不需要註冊寫事件處理器
 7  */
 8 int handleClientsWithPendingWrites(void) {
 9     listIter li;
10     listNode *ln;
11     // 獲取系統延遲寫佇列的長度
12     int processed = listLength(server.clients_pending_write);
13 
14     listRewind(server.clients_pending_write,&li);
15     // 依次處理
16     while((ln = listNext(&li))) {
17         client *c = listNodeValue(ln);
18         c->flags &= ~CLIENT_PENDING_WRITE;
19         listDelNode(server.clients_pending_write,ln);
20 
21         /* If a client is protected, don't do anything,
22          * that may trigger write error or recreate handler. */
23         if (c->flags & CLIENT_PROTECTED) continue;
24 
25         /* Try to write buffers to the client socket. */
26         // 將緩衝值寫入client的socket中,如果寫完,則跳過之後的操作。
27         if (writeToClient(c->fd,c,0) == C_ERR) continue;
28 
29         /* If after the synchronous writes above we still have data to
30          * output to the client, we need to install the writable handler. */
31         // 還有資料未寫入,只能註冊寫事件處理器了
32         if (clientHasPendingReplies(c)) {
33             int ae_flags = AE_WRITABLE;
34             /* For the fsync=always policy, we want that a given FD is never
35              * served for reading and writing in the same event loop iteration,
36              * so that in the middle of receiving the query, and serving it
37              * to the client, we'll call beforeSleep() that will do the
38              * actual fsync of AOF to disk. AE_BARRIER ensures that. */
39             if (server.aof_state == AOF_ON &&
40                 server.aof_fsync == AOF_FSYNC_ALWAYS)
41             {
42                 ae_flags |= AE_BARRIER;
43             }
44             // 註冊寫事件處理器 sendReplyToClient,等待執行
45             if (aeCreateFileEvent(server.el, c->fd, ae_flags,
46                 sendReplyToClient, c) == AE_ERR)
47             {
48                     freeClientAsync(c);
49             }
50         }
51     }
52     return processed;
53 }

  sendReplyToClient 方法其實也會呼叫 writeToClient 方法,該方法就是將輸出緩衝區中的 buf 和 reply 列表中的資料都儘可能多的寫入到對應的 socket中。

 1 /**
 2  * Write event handler. Just send data to the client.寫IO事件的回撥函式
 3  * 為了向客戶端返回命令的執行結果, 伺服器要為客戶端套接字關聯命令回覆處理器。
 4  *
 5  * sendReplyToClient函式是Redis的命令回覆處理器,
 6  * 這個處理器負責將伺服器執行命令後得到的命令回覆通過套接字返回給客戶端,具體實現為unistd.h/write函式的包裝。
 7  *
 8  * 當伺服器有命令回覆需要傳送給客戶端的時候,伺服器會將客戶端套接字的AE_WRITABLE事件和命令回覆處理器關聯起來,
 9  * 當客戶端準備好接收伺服器傳回的命令回覆時,就會產生AE_WRITABLE事件,引發命令回覆處理器執行,
10  * 並執行相應的套接字寫入操作,
11  *
12  * 當命令回覆傳送完畢之後, 伺服器就會解除命令回覆處理器與客戶端套接字的 AE_WRITABLE 事件之間的關聯。
13  */
14 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
15     UNUSED(el);
16     UNUSED(mask);
17     // 傳送完資料會刪除fd的可讀事件
18     writeToClient(fd,privdata,1);
19 }

writeToClient 方法分析

  1 /**
  2  * Write data in output buffers to client. Return C_OK if the client
  3  * is still valid after the call, C_ERR if it was freed.
  4  * 這個函式實際上是對write()函式的封裝,將靜態回覆緩衝區buf或回覆連結串列reply中的資料迴圈寫到檔案描述符fd中。
  5  * 如果寫完了,則將當前客戶端的AE_WRITABLE事件刪除。
  6  *
  7  * 將輸出緩衝區的資料寫給client,如果client被釋放則返回C_ERR,沒被釋放則返回C_OK
  8  */
  9 int writeToClient(int fd, client *c, int handler_installed) {
 10     ssize_t nwritten = 0, totwritten = 0;
 11     size_t objlen;
 12     clientReplyBlock *o;
 13 
 14     // 如果指定的client的回覆緩衝區中還有資料,則返回真,表示可以寫socket
 15     while(clientHasPendingReplies(c)) {
 16         // 固定緩衝區傳送未完成
 17         if (c->bufpos > 0) {
 18             // 將緩衝區的資料寫到fd中
 19             nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
 20             // 寫失敗跳出迴圈
 21             if (nwritten <= 0) break;
 22             // 更新發送的資料計數器
 23             c->sentlen += nwritten;
 24             totwritten += nwritten;
 25 
 26             /* If the buffer was sent, set bufpos to zero to continue with
 27              * the remainder of the reply. */
 28             // 如果傳送的資料等於buf的偏移量,表示傳送完成
 29             if ((int)c->sentlen == c->bufpos) {
 30                 // 則將其重置
 31                 c->bufpos = 0;
 32                 c->sentlen = 0;
 33             }
 34         } else {// 固定緩衝區傳送完成,傳送回覆連結串列的內容
 35             // 回覆連結串列的第一條回覆物件,和物件值的長度和所佔的記憶體
 36             o = listNodeValue(listFirst(c->reply));
 37             objlen = o->used;
 38 
 39             // 跳過空物件,並刪除這個物件
 40             if (objlen == 0) {
 41                 c->reply_bytes -= o->size;
 42                 listDelNode(c->reply,listFirst(c->reply));
 43                 continue;
 44             }
 45 
 46             // 將當前節點的值寫到fd中
 47             nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
 48             // 寫失敗跳出迴圈
 49             if (nwritten <= 0) break;
 50             // 更新發送的資料計數器
 51             c->sentlen += nwritten;
 52             totwritten += nwritten;
 53 
 54             /* If we fully sent the object on head go to the next one */
 55             // 傳送完成,則刪除該節點,重置傳送的資料長度,更新回覆連結串列的總位元組數
 56             if (c->sentlen == objlen) {
 57                 c->reply_bytes -= o->size;
 58                 listDelNode(c->reply,listFirst(c->reply));
 59                 c->sentlen = 0;
 60                 /* If there are no longer objects in the list, we expect
 61                  * the count of reply bytes to be exactly zero. */
 62                 if (listLength(c->reply) == 0)
 63                     serverAssert(c->reply_bytes == 0);
 64             }
 65         }
 66         /**
 67          * Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
 68          * bytes, in a single threaded server it's a good idea to serve
 69          * other clients as well, even if a very large request comes from
 70          * super fast link that is always able to accept data (in real world
 71          * scenario think about 'KEYS *' against the loopback interface).
 72          *
 73          * However if we are over the maxmemory limit we ignore that and
 74          * just deliver as much data as it is possible to deliver.
 75          *
 76          * Moreover, we also send as much as possible if the client is
 77          * a slave (otherwise, on high-speed traffic, the replication
 78          * buffer will grow indefinitely)
 79          *
 80          * 如果這次寫的總量大於NET_MAX_WRITES_PER_EVENT的限制,則會中斷本次的寫操作,
 81          * 將處理時間讓給其他的client,以免一個非常的回覆獨佔伺服器,剩餘的資料下次繼續在寫
 82          *
 83          * 但是,如果當伺服器的記憶體數已經超過maxmemory,即使超過最大寫NET_MAX_WRITES_PER_EVENT的限制,
 84          * 也會繼續執行寫入操作,是為了儘快寫入給客戶端
 85          */
 86         if (totwritten > NET_MAX_WRITES_PER_EVENT &&
 87             (server.maxmemory == 0 ||
 88              zmalloc_used_memory() < server.maxmemory) &&
 89             !(c->flags & CLIENT_SLAVE)) break;
 90     }
 91     // 更新寫到網路的位元組數
 92     server.stat_net_output_bytes += totwritten;
 93     // 處理寫入失敗
 94     if (nwritten == -1) {
 95         if (errno == EAGAIN) {
 96             nwritten = 0;
 97         } else {
 98             serverLog(LL_VERBOSE,
 99                 "Error writing to client: %s", strerror(errno));
100             freeClient(c);
101             return C_ERR;
102         }
103     }
104     // 寫入成功
105     if (totwritten > 0) {
106         /**
107          * For clients representing masters we don't count sending data
108          * as an interaction, since we always send REPLCONF ACK commands
109          * that take some time to just fill the socket output buffer.
110          * We just rely on data / pings received for timeout detection.
111          * 如果不是主節點伺服器,則更新最近和伺服器互動的時間
112          */
113         if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
114     }
115     // 如果指定的client的回覆緩衝區中已經沒有資料,傳送完成
116     if (!clientHasPendingReplies(c)) {
117         c->sentlen = 0;
118         // 如果內容已經全部輸出,刪除當前client的可讀事件的監聽
119         if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
120 
121         /* Close connection after entire reply has been sent. */
122         // 如果指定了寫入按成之後立即關閉的標誌,也就是資料全部返回,則釋放client
123         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
124             freeClient(c);
125             return C_ERR;
126         }
127     }
128     return C_OK;
129 }
View Code

參考文章

https://www.cnblogs.com/remcarpediem/p/12024468.html

https://www.cnblogs.com/remcarpediem/p/12038377.html