1. 程式人生 > 其它 >Redis網路連線庫剖析

Redis網路連線庫剖析

一、執行緒模型(網路連線庫的整體實現框架)

1、檔案事件處理器

  Redis基於Reactor模式開發了網路事件處理器,這個處理器被稱為檔案事件處理器。它的組成結構為4部分:多個套接字、IO多路複用程式、檔案事件分派器、事件處理器。因為檔案事件分派器佇列的消費是單執行緒的,所以Redis才叫單執行緒模型。

2、訊息處理流程
  檔案事件處理器使用I/O多路複用(multiplexing)程式來同時監聽多個套接字,並根據套接字目前執行的任務來為套接字關聯不同的事件處理器。

    • 當被監聽的套接字準備好執行連線應答(accept)、讀取(read)、寫入(write)、關閉(close)等操作時,與操作相對應的檔案事件就會產生,這時檔案事件處理器就會呼叫套接字之前關聯好的事件處理器來處理這些事件。
    • 儘管多個檔案事件可能會併發地出現,但I/O多路複用程式總是會將所有產生事件的套接字都推到一個佇列裡面,然後通過這個佇列,以有序(sequentially)、同步(synchronously)、每次一個套接字的方式向檔案事件分派器傳送套接字:當上一個套接字產生的事件被處理完畢之後(該套接字為事件所關聯的事件處理器執行完畢), I/O多路複用程式才會繼續向檔案事件分派器傳送下一個套接字。

3、I/O 多路複用程式的實現
  Redis的I/O多路複用程式的所有功能是通過包裝select、epoll、evport和kqueue這些I/O多路複用函式庫來實現的,每個I/O多路複用函式庫在Redis原始碼中都對應一個單獨的檔案,比如ae_select.c、ae_epoll.c、ae_kqueue.c等。

  因為Redis為每個I/O多路複用函式庫都實現了相同的API,所以I/O多路複用程式的底層實現是可以互換的,如下圖所示。

Redis在I/O多路複用程式的實現原始碼ae.c中用#include巨集定義了相應的規則,程式會在編譯時自動選擇系統中效能最好的I/O多路複用函式庫來作為Redis的I/O多路複用程式的底層實現:

 1 /**
 2  * Include the best multiplexing layer supported by this system.
 3  * The following should be ordered by performances, descending.
4 * 包括該系統支援的最佳複用層。效能依次下降 5 */ 6 #ifdef HAVE_EVPORT 7 #include "ae_evport.c" 8 #else 9 #ifdef HAVE_EPOLL 10 #include "ae_epoll.c" 11 #else 12 #ifdef HAVE_KQUEUE 13 #include "ae_kqueue.c" 14 #else 15 #include "ae_select.c" 16 #endif 17 #endif 18 #endif

4、檔案事件的型別
  I/O 多路複用程式可以監聽多個套接字的ae.h/AE_READABLE事件和ae.h/AE_WRITABLE事件,這兩類事件和套接字操作之間的對應關係如下:

    • 當套接字變得可讀時(客戶端對套接字執行write操作,或者執行close操作),或者有新的可應答(acceptable)套接字出現時(客戶端對伺服器的監聽套接字執行connect操作),套接字產生AE_READABLE 事件。
    • 當套接字變得可寫時(客戶端對套接字執行read操作),套接字產生AE_WRITABLE事件。I/O多路複用程式允許伺服器同時監聽套接字的AE_READABLE事件和AE_WRITABLE事件,如果一個套接字同時產生了這兩種事件,那麼檔案事件分派器會優先處理AE_READABLE事件,等到AE_READABLE事件處理完之後,才處理AE_WRITABLE 事件。這也就是說,如果一個套接字又可讀又可寫的話,那麼伺服器將先讀套接字,後寫套接字。

事件型別

 1 #define AE_NONE 0       /* No events registered. 沒有事件*/
 2 #define AE_READABLE 1   /* Fire when descriptor is readable. 當描述符可讀時觸發AE_READABLE事件。*/
 3 #define AE_WRITABLE 2   /* Fire when descriptor is writable. 當描述符可寫時觸發。*/
 4 /**
 5  * With WRITABLE, never fire the event if the READABLE event already fired in the same event
 6  * loop iteration. Useful when you want to persist things to disk before sending replies, and want
 7  * to do that in a group fashion.
 8  * 如果 READABLE 事件已在同一事件迴圈迭代中觸發,則此時絕不會觸發WRITABLE事件,
 9  * 當您想在傳送回覆之前將內容持久儲存到磁碟時很有用,並且希望以組方式進行。
10  *
11  * 但是,網路 IO 事件註冊的時候,除了正常的讀寫事件外,還可以註冊一個 AE_BARRIER 事件,
12  * 這個事件就是會影響到先讀後寫的處理順序。
13  *
14  * 如果某個 fd 的 mask 包含了 AE_BARRIER,那它的處理順序會是 先寫後讀。
15  *
16  * 針對這個場景,redis 舉的例子是,如果在 beforesleep 回撥中進行了 fsync 動作,
17  * 然後需要把結果快速回復給 client。這個情況下就需要用到 AE_BARRIER 事件,用來翻轉處理事件順序了。
18  */
19 #define AE_BARRIER 4

5、檔案事件的處理器
  Redis為檔案事件編寫了多個處理器,這些事件處理器分別用於實現不同的網路通訊需求,常用的處理器如下:

    • 為了對連線伺服器的各個客戶端進行應答, 伺服器要為監聽套接字關聯連線應答處理器。
    • 為了接收客戶端傳來的命令請求, 伺服器要為客戶端套接字關聯命令請求處理器。
    • 為了向客戶端返回命令的執行結果, 伺服器要為客戶端套接字關聯命令回覆處理器。

6、連線應答處理器
  networking.c中acceptTcpHandler函式是Redis的連線應答處理器,這個處理器用於對連線伺服器監聽套接字的客戶端進行應答,具體實現為sys/socket.h/accept函式的包裝。

  當Redis伺服器進行初始化的時候,程式會將這個連線應答處理器和伺服器監聽套接字的AE_READABLE事件關聯起來,當有客戶端用sys/socket.h/connect函式連線伺服器監聽套接字的時候, 套接字就會產生AE_READABLE 事件, 引發連線應答處理器執行, 並執行相應的套接字應答操作,如圖所示。

7、命令請求處理器
  networking.c中readQueryFromClient函式是Redis的命令請求處理器,這個處理器負責從套接字中讀入客戶端傳送的命令請求內容, 具體實現為unistd.h/read函式的包裝。

  當一個客戶端通過連線應答處理器成功連線到伺服器之後, 伺服器會將客戶端套接字的AE_READABLE事件和命令請求處理器關聯起來,當客戶端向伺服器傳送命令請求的時候,套接字就會產生 AE_READABLE事件,引發命令請求處理器執行,並執行相應的套接字讀入操作,如圖所示。

在客戶端連線伺服器的整個過程中,伺服器都會一直為客戶端套接字的AE_READABLE事件關聯命令請求處理器

8、命令回覆處理器
networking.c中sendReplyToClient函式是Redis的命令回覆處理器,這個處理器負責將伺服器執行命令後得到的命令回覆通過套接字返回給客戶端,具體實現為unistd.h/write函式的包裝。

當伺服器有命令回覆需要傳送給客戶端的時候,伺服器會將客戶端套接字的AE_WRITABLE事件和命令回覆處理器關聯起來,當客戶端準備好接收伺服器傳回的命令回覆時,就會產生AE_WRITABLE事件,引發命令回覆處理器執行,並執行相應的套接字寫入操作, 如圖所示

當命令回覆傳送完畢之後, 伺服器就會解除命令回覆處理器與客戶端套接字的 AE_WRITABLE 事件之間的關聯

9、一次完整的客戶端與伺服器連線事件示例
  假設Redis伺服器正在運作,那麼這個伺服器的監聽套接字的AE_READABLE事件應該正處於監聽狀態之下,而該事件所對應的處理器為連線應答處理器。

  如果這時有一個Redis客戶端向Redis伺服器發起連線,那麼監聽套接字將產生AE_READABLE事件, 觸發連線應答處理器執行:處理器會對客戶端的連線請求進行應答, 然後建立客戶端套接字,以及客戶端狀態,並將客戶端套接字的 AE_READABLE 事件與命令請求處理器進行關聯,使得客戶端可以向主伺服器傳送命令請求。

  之後,客戶端向Redis伺服器傳送一個命令請求,那麼客戶端套接字將產生 AE_READABLE事件,引發命令請求處理器執行,處理器讀取客戶端的命令內容, 然後傳給相關程式去執行。

  執行命令將產生相應的命令回覆,為了將這些命令回覆傳送回客戶端,伺服器會將客戶端套接字的AE_WRITABLE事件與命令回覆處理器進行關聯:當客戶端嘗試讀取命令回覆的時候,客戶端套接字將產生AE_WRITABLE事件, 觸發命令回覆處理器執行, 當命令回覆處理器將命令回覆全部寫入到套接字之後, 伺服器就會解除客戶端套接字的AE_WRITABLE事件與命令回覆處理器之間的關聯。

二、實現

1. Redis網路連線庫介紹
Redis網路連線庫對應的檔案是networking.c。這個檔案主要負責

  • 客戶端的建立與釋放
  • 命令接收與命令回覆
  • Redis通訊協議分析
  • CLIENT 命令的實現

我們接下來就這幾塊內容分別列出原始碼,進行剖析。

2. 客戶端的建立與釋放
  redis 網路連結庫的原始碼詳細註釋,連結:https://github.com/menwenjun/redis_source_annotation/blob/master/networking.c

2.1客戶端的建立

  Redis 伺服器是一個同時與多個客戶端建立連線的程式。當客戶端連線上伺服器時,伺服器會建立一個server.h/client結構來儲存客戶端的狀態資訊。所以在客戶端建立時,就會初始化這樣一個結構,客戶端的建立原始碼如下:

  1 client *createClient(int fd) {
  2     //分配空間
  3     client *c = zmalloc(sizeof(client));
  4 
  5     /**
  6      * passing -1 as fd it is possible to create a non connected client.
  7      * This is useful since all the commands needs to be executed
  8      * in the context of a client. When commands are executed in other
  9      * contexts (for instance a Lua script) we need a non connected client.
 10      *
 11      * 如果fd為-1,表示建立的是一個無網路連線的偽客戶端,用於執行lua指令碼的時候。
 12      * 如果fd不等於-1,表示建立一個有網路連線的客戶端
 13      */
 14     if (fd != -1) {
 15         // 設定fd為非阻塞模式
 16         anetNonBlock(NULL,fd);
 17         // 禁止使用 Nagle 演算法,client向核心遞交的每個資料包都會立即傳送給server出去,TCP_NODELAY
 18         anetEnableTcpNoDelay(NULL,fd);
 19         // 如果開啟了tcpkeepalive,則設定 SO_KEEPALIVE
 20         if (server.tcpkeepalive)
 21             anetKeepAlive(NULL,fd,server.tcpkeepalive);// 設定tcp連線的keep alive選項
 22         /**
 23          * 使能AE_READABLE事件,readQueryFromClient是該事件的回撥函式
 24          *
 25          * 建立一個檔案事件狀態el,且監聽讀事件,開始接受命令的輸入
 26          */
 27         if (aeCreateFileEvent(server.el,fd,AE_READABLE,
 28             readQueryFromClient, c) == AE_ERR)
 29         {
 30             close(fd);
 31             zfree(c);
 32             return NULL;
 33         }
 34     }
 35 
 36     // 預設選0號資料庫
 37     selectDb(c,0);
 38     uint64_t client_id;
 39     // 設定client的ID
 40     atomicGetIncr(server.next_client_id,client_id,1);
 41     c->id = client_id;
 42     // client的套接字
 43     c->fd = fd;
 44     // client的名字
 45     c->name = NULL;
 46     // 回覆固定(靜態)緩衝區的偏移量
 47     c->bufpos = 0;
 48     c->qb_pos = 0;
 49     // 輸入快取區
 50     c->querybuf = sdsempty();
 51     c->pending_querybuf = sdsempty();
 52     // 輸入快取區的峰值
 53     c->querybuf_peak = 0;
 54     // 請求協議型別,內聯或者多條命令,初始化為0
 55     c->reqtype = 0;
 56     // 引數個數
 57     c->argc = 0;
 58     // 引數列表
 59     c->argv = NULL;
 60     // 當前執行的命令和最近一次執行的命令
 61     c->cmd = c->lastcmd = NULL;
 62     // 查詢緩衝區剩餘未讀取命令的數量
 63     c->multibulklen = 0;
 64     // 讀入引數的長度
 65     c->bulklen = -1;
 66     // 已發的位元組數
 67     c->sentlen = 0;
 68     // client的狀態
 69     c->flags = 0;
 70     // 設定建立client的時間和最後一次互動的時間
 71     c->ctime = c->lastinteraction = server.unixtime;
 72     // 認證狀態
 73     c->authenticated = 0;
 74     // replication複製的狀態,初始為無
 75     c->replstate = REPL_STATE_NONE;
 76     // 設定從節點的寫處理器為ack,是否在slave向master傳送ack
 77     c->repl_put_online_on_ack = 0;
 78     // replication複製的偏移量
 79     c->reploff = 0;
 80     c->read_reploff = 0;
 81     // 通過ack命令接收到的偏移量
 82     c->repl_ack_off = 0;
 83     // 通過ack命令接收到的偏移量所用的時間
 84     c->repl_ack_time = 0;
 85     // 從節點的埠號
 86     c->slave_listening_port = 0;
 87     // 從節點IP地址
 88     c->slave_ip[0] = '\0';
 89     // 從節點的功能
 90     c->slave_capa = SLAVE_CAPA_NONE;
 91     // 回覆連結串列
 92     c->reply = listCreate();
 93     // 回覆連結串列的位元組數
 94     c->reply_bytes = 0;
 95     // 回覆緩衝區的記憶體大小軟限制
 96     c->obuf_soft_limit_reached_time = 0;
 97     // 回覆連結串列的釋放和複製方法
 98     listSetFreeMethod(c->reply,freeClientReplyValue);
 99     listSetDupMethod(c->reply,dupClientReplyValue);
100     // 阻塞型別
101     c->btype = BLOCKED_NONE;
102     // 阻塞超過時間
103     c->bpop.timeout = 0;
104     // 造成阻塞的鍵字典
105     c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
106     // 儲存解除阻塞的鍵,用於儲存PUSH入元素的鍵,也就是dstkey
107     c->bpop.target = NULL;
108     c->bpop.xread_group = NULL;
109     c->bpop.xread_consumer = NULL;
110     c->bpop.xread_group_noack = 0;
111     // 阻塞狀態
112     c->bpop.numreplicas = 0;
113     // 要達到的複製偏移量
114     c->bpop.reploffset = 0;
115     // 全域性的複製偏移量
116     c->woff = 0;
117     // 監控的鍵
118     c->watched_keys = listCreate();
119     // 訂閱頻道
120     c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
121     // 訂閱模式
122     c->pubsub_patterns = listCreate();
123     // 被快取的peerid,peerid就是 ip:port
124     c->peerid = NULL;
125     c->client_list_node = NULL;
126     // 訂閱釋出模式的釋放和比較方法
127     listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
128     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
129     // 將真正的client放在伺服器的客戶端連結串列中
130     if (fd != -1)
131         linkClient(c);//將當前客戶端加入全域性的連結串列中
132     // 初始化client的事物狀態
133     initClientMultiState(c);
134     return c;
135 }

根據傳入的檔案描述符fd,可以建立用於不同情景下的client。這個fd就是伺服器接收客戶端connect後所返回的檔案描述符。

  • fd == -1。表示建立一個無網路連線的客戶端。主要用於執行 lua 指令碼時。
  • fd != -1。表示接收到一個正常的客戶端連線,則會建立一個有網路連線的客戶端,也就是建立一個檔案事件,來監聽這個fd是否可讀,當客戶端傳送資料,則事件被觸發。建立客戶端時,還會禁用Nagle演算法。

  其中,Nagle演算法能自動連線許多的小緩衝器訊息,這一過程(稱為nagling)通過減少必須傳送包的個數來增加網路軟體系統的效率。但是伺服器和客戶端的對即使通訊性有很高的要求,因此禁止使用 Nagle 演算法,客戶端向核心遞交的每個資料包都會立即傳送給伺服器。

建立客戶端的過程,會將server.h/client結構的所有成員初始化,接下里會介紹部分重點的成員。

  • int id:伺服器對於每一個連線進來的都會建立一個ID,客戶端的ID從1開始。每次重啟伺服器會重新整理。
  • int fd:當前客戶端狀態描述符。分為無網路連線的客戶端和有網路連線的客戶端。
  • int flags:客戶端狀態的標誌。Redis 3.2.8 中在server.h中定義了23種狀態。
  • robj *name:預設建立的客戶端是沒有名字的,可以通過CLIENT SETNAME命令設定名字。後面會介紹該命令的實現。
  • int reqtype:請求協議的型別。因為Redis伺服器支援Telnet的連線,因此Telnet命令請求協議型別是PROTO_REQ_INLINE,而redis-cli命令請求的協議型別是PROTO_REQ_MULTIBULK。

用於儲存伺服器接受客戶端命令的成員:

  • sds querybuf:儲存客戶端發來命令請求的輸入緩衝區。以Redis通訊協議的方式儲存。
  • size_t querybuf_peak:儲存輸入緩衝區的峰值。
  • int argc:命令引數個數。
  • robj *argv:命令引數列表。

用於儲存伺服器給客戶端回覆的成員:

  • char buf[16*1024]:儲存執行完命令所得命令回覆資訊的靜態緩衝區,它的大小是固定的,所以主要儲存的是一些比較短的回覆。分配client結構空間時,就會分配一個16K的大小。
  • int bufpos:記錄靜態緩衝區的偏移量,也就是buf陣列已經使用的位元組數。
  • list *reply:儲存命令回覆的連結串列。因為靜態緩衝區大小固定,主要儲存固定長度的命令回覆,當處理一些返回大量回復的命令,則會將命令回覆以連結串列的形式連線起來。
  • unsigned long long reply_bytes:儲存回覆連結串列的位元組數。
  • size_t sentlen:已傳送回覆的位元組數。

客戶端結構體,位於server.h

  1 /**
  2  * With multiplexing we need to take per-client state.
  3  * Clients are taken in a linked list.
  4  * 對於多路複用,我們需要獲取每個客戶端的狀態。 客戶端被放入一個連結串列中。
  5  * 封裝的客戶端結構體
  6  */
  7 typedef struct client {
  8     uint64_t id;            /* Client incremental unique ID. */
  9     //客戶端的套接字,偽客戶端的為-1(AOF,Lua指令碼),其它為普通客戶端
 10     int fd;                 /* Client socket. */\
 11     //客戶端正在使用的redis資料庫指標
 12     redisDb *db;            /* Pointer to currently SELECTed DB. */
 13     //客戶端的名字,使用client setname 來設定名稱
 14     robj *name;             /* As set by CLIENT SETNAME. */
 15     //輸入快取區
 16     sds querybuf;           /* Buffer we use to accumulate client queries. 我們用來儲存客戶端請求的緩衝區*/
 17     size_t qb_pos;          /* The position we have read in querybuf. 我們在 querybuf 中已經讀取的位置。*/
 18     sds pending_querybuf;   /* If this client is flagged as master, this buffer
 19                                represents the yet not applied portion of the
 20                                replication stream that we are receiving from
 21                                the master.如果此客戶端被標記為 master,
 22                                則此緩衝區表示我們從 master 接收的複製流中尚未應用的部分。 */
 23     size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. 查詢緩衝區的最近(100 毫秒或更多)峰值*/
 24     int argc;               /* Num of arguments of current command. 命令引數個數*/
 25     robj **argv;            /* Arguments of current command. 命令引數*/
 26     // 記錄被客戶端執行的命令,最後一個被執行的命令
 27     struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
 28     /**
 29      * telnet命令還是redis-cli命令
 30      */
 31     int reqtype;            /* Request protocol type: PROTO_REQ_* */
 32     // 剩下要讀取的所有引數的數量。
 33     int multibulklen;       /* Number of multi bulk arguments left to read. */
 34     // 命令內容的長度
 35     long bulklen;           /* Length of bulk argument in multi bulk request. */
 36     // 回覆連結串列
 37     list *reply;            /* List of reply objects to send to the client. */
 38     // 回覆連結串列中物件的總大小
 39     unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
 40     // 已傳送位元組,處理 short write 用
 41     size_t sentlen;         /* Amount of bytes already sent in the current
 42                                buffer or object being sent. */
 43     //建立客戶端的時間
 44     time_t ctime;           /* Client creation time. */
 45     //最後一次互動時間,可以用來計算該連線的空轉時間
 46     time_t lastinteraction; /* Time of the last interaction, used for timeout */
 47     //緩衝區第一次達到軟性限制的時間
 48     time_t obuf_soft_limit_reached_time;
 49     //redis標誌位
 50     int flags;              /* Client flags: CLIENT_* macros. */
 51     /**
 52      * 身份驗證,0表示未通過身份驗證,1表示通過身份驗證。僅在服務端啟用了安全驗證時才用,若啟用
 53      * 未通過驗證,則只能使用AUTH命令,其它均無法使用
 54      */
 55     int authenticated;      /* When requirepass is non-NULL. */
 56     // 如果是從站,代表的是複製狀態
 57     int replstate;          /* Replication state if this is a slave. */
 58 
 59     int repl_put_online_on_ack; /* Install slave write handler on ACK. */
 60     // 用於儲存主伺服器傳來的 RDB 檔案的檔案描述符
 61     int repldbfd;           /* Replication DB file descriptor. */
 62     // 讀取主伺服器傳來的 RDB 檔案的偏移量
 63     off_t repldboff;        /* Replication DB file offset. */
 64     // 主伺服器傳來的 RDB 檔案的大小
 65     off_t repldbsize;       /* Replication DB file size. */
 66     sds replpreamble;       /* Replication DB preamble. */
 67     long long read_reploff; /* Read replication offset if this is a master. */
 68     // 主伺服器的複製偏移量
 69     long long reploff;      /* Applied replication offset if this is a master. */
 70     // 從伺服器最後一次傳送 REPLCONF ACK 時的偏移量
 71     long long repl_ack_off; /* Replication ack offset, if this is a slave. */
 72     // 從伺服器最後一次傳送 REPLCONF ACK 的時間
 73     long long repl_ack_time;/* Replication ack time, if this is a slave. */
 74     long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
 75                                        copying this slave output buffer
 76                                        should use. */
 77     /**
 78      * 主伺服器的 master run ID
 79      * 儲存在客戶端,用於執行部分重同步
 80      */
 81     char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
 82     // 從伺服器的監聽埠號
 83     int slave_listening_port; /* As configured with: SLAVECONF listening-port */
 84     // 從伺服器的監聽IP
 85     char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
 86     // 從節點的功能
 87     int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
 88     // 事務狀態
 89     multiState mstate;      /* MULTI/EXEC state */
 90     // 阻塞型別
 91     int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
 92     // 阻塞狀態
 93     blockingState bpop;     /* blocking state */
 94     // 最後被寫入的全域性複製偏移量
 95     long long woff;         /* Last write global replication offset. */
 96     // 被監視的鍵
 97     list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
 98     /**
 99      * 這個字典記錄了客戶端所有訂閱的頻道
100      * 鍵為頻道名字,值為 NULL
101      * 也即是,一個頻道的集合
102      */
103     dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
104     /**
105      * 連結串列,包含多個 pubsubPattern 結構
106      * 記錄了所有訂閱頻道的客戶端的資訊
107      * 新 pubsubPattern 結構總是被新增到表尾
108      */
109     list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
110     sds peerid;             /* Cached peer ID. */
111     listNode *client_list_node; /* list node in client list */
112 
113     /* Response buffer */
114     // 回覆偏移量
115     int bufpos;
116     //預設大小為16k,回覆緩衝區
117     char buf[PROTO_REPLY_CHUNK_BYTES];
118 } client;
View Code blockingState結構體,位於server.h
 1 /**
 2  * This structure holds the blocking operation state for a client.
 3  * The fields used depend on client->btype.
 4  * 此結構儲存客戶端的阻塞操作狀態。 使用的欄位取決於 client->btype。
 5  */
 6 typedef struct blockingState {
 7     /* Generic fields. */
 8     // 阻塞超過時間
 9     mstime_t timeout;       /* Blocking operation timeout. If UNIX current time
10                              * is > timeout then the operation timed out. */
11 
12     /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */
13     // 造成阻塞的鍵字典
14     dict *keys;             /* The keys we are waiting to terminate a blocking
15                              * operation such as BLPOP or XREAD. Or NULL. */
16     // 儲存解除阻塞的鍵,用於儲存PUSH入元素的鍵,也就是dstkey
17     robj *target;           /* The key that should receive the element,
18                              * for BRPOPLPUSH. */
19 
20     /* BLOCK_STREAM */
21     size_t xread_count;     /* XREAD COUNT option. */
22     robj *xread_group;      /* XREADGROUP group name. */
23     robj *xread_consumer;   /* XREADGROUP consumer name. */
24     mstime_t xread_retry_time, xread_retry_ttl;
25     int xread_group_noack;
26 
27     /* BLOCKED_WAIT */
28     // 阻塞狀態
29     int numreplicas;        /* Number of replicas we are waiting for ACK. */
30     // 要達到的複製偏移量
31     long long reploffset;   /* Replication offset to reach. */
32 
33     /* BLOCKED_MODULE */
34     void *module_blocked_handle; /* RedisModuleBlockedClient structure.
35                                     which is opaque for the Redis core, only
36                                     handled in module.c. */
37 } blockingState;

2.2 客戶端的釋放

客戶端的釋放freeClient()函式主要就是釋放各種資料結構和清空一些緩衝區等等操作,這裡就不列出原始碼。但是我們關注一下非同步釋放客戶端。原始碼如下:

 1 /**
 2  * Schedule a client to free it at a safe time in the serverCron() function.
 3  * This function is useful when we need to terminate a client but we are in
 4  * a context where calling freeClient() is not possible, because the client
 5  * should be valid for the continuation of the flow of the program.
 6  * 非同步釋放client
 7  */
 8 void freeClientAsync(client *c) {
 9     // 如果是已經即將關閉或者是lua指令碼的偽client,則直接返回
10     if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
11     c->flags |= CLIENT_CLOSE_ASAP;
12     // 將client加入到即將關閉的client連結串列中
13     listAddNodeTail(server.clients_to_close,c);
14 }

其中server.clients_to_close:是伺服器儲存所有待關閉的client連結串列,redisServer結構體如下,位於server.h

  1 struct redisServer {
  2     /* General */
  3     pid_t pid;                  /* Main process pid. */
  4     // 配置檔案的絕對路徑
  5     char *configfile;           /* Absolute config file path, or NULL */
  6     char *executable;           /* Absolute executable file path. */
  7     char **exec_argv;           /* Executable argv vector (copy). */
  8     int dynamic_hz;             /* Change hz value depending on # of clients. */
  9     int config_hz;              /* Configured HZ value. May be different than
 10                                    the actual 'hz' field value if dynamic-hz
 11                                    is enabled. */
 12     // serverCron() 每秒呼叫的次數
 13     int hz;                     /* serverCron() calls frequency in hertz */
 14     // 資料庫
 15     redisDb *db;
 16     // 命令表(受到 rename 配置選項的作用)
 17     dict *commands;             /* Command table */
 18     // 命令表(無 rename 配置選項的作用)
 19     dict *orig_commands;        /* Command table before command renaming. */
 20     // 事件狀態,網路模型底層結構封裝
 21     aeEventLoop *el;
 22     // 最近一次使用時鐘
 23     unsigned int lruclock;      /* Clock for LRU eviction */
 24     // 關閉伺服器的標識
 25     int shutdown_asap;          /* SHUTDOWN needed ASAP */
 26     // 在執行 serverCron() 時進行漸進式 rehash
 27     int activerehashing;        /* Incremental rehash in serverCron() */
 28     int active_defrag_running;  /* Active defragmentation running (holds current scan aggressiveness) */
 29     // 是否設定了密碼
 30     char *requirepass;          /* Pass for AUTH command, or NULL */
 31     // PID 檔案
 32     char *pidfile;              /* PID file path */
 33     // 架構型別
 34     int arch_bits;              /* 32 or 64 depending on sizeof(long) */
 35     // serverCron() 函式的執行次數計數器
 36     int cronloops;              /* Number of times the cron function run */
 37     // 本伺服器的 RUN ID
 38     char runid[CONFIG_RUN_ID_SIZE+1];  /* ID always different at every exec. */
 39     // 伺服器是否執行在 SENTINEL 模式
 40     int sentinel_mode;          /* True if this instance is a Sentinel. */
 41     size_t initial_memory_usage; /* Bytes used after initialization. */
 42     int always_show_logo;       /* Show logo even for non-stdout logging. */
 43     /* Modules */
 44     dict *moduleapi;            /* Exported APIs dictionary for modules. */
 45     list *loadmodule_queue;     /* List of modules to load at startup. */
 46     int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
 47                                    client blocked on a module command needs
 48                                    to be processed. */
 49     /* Networking */
 50     // TCP 監聽埠
 51     int port;                   /* TCP listening port */
 52     // 允許監聽的檔案描述符
 53     int tcp_backlog;            /* TCP listen() backlog */
 54     // 地址
 55     char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
 56     // 地址數量
 57     int bindaddr_count;         /* Number of addresses in server.bindaddr[] */
 58     // UNIX 套接字
 59     char *unixsocket;           /* UNIX socket path */
 60     // UNIX 套接字許可權
 61     mode_t unixsocketperm;      /* UNIX socket permission */
 62     // 描述符
 63     int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
 64     // 描述符數量
 65     int ipfd_count;             /* Used slots in ipfd[] */
 66     int sofd;                   /* Unix socket file descriptor */
 67     int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
 68     int cfd_count;              /* Used slots in cfd[] */
 69     // 一個連結串列,儲存了所有客戶端狀態結構
 70     list *clients;              /* List of active clients */
 71     // 連結串列,儲存了所有待關閉的客戶端
 72     list *clients_to_close;     /* Clients to close asynchronously */
 73     list *clients_pending_write; /* There is to write or install handler. */
 74     // 連結串列,儲存了所有從伺服器,以及所有監視器
 75     list *slaves, *monitors;    /* List of slaves and MONITORs */
 76     // 伺服器的當前客戶端,僅用於崩潰報告
 77     client *current_client; /* Current client, only used on crash report */
 78     // 基於客戶端ID的啟用客戶端字典
 79     rax *clients_index;         /* Active clients dictionary by client ID. */
 80     //如果當前的客戶端正處於阻塞狀態,就將其設定為true
 81     int clients_paused;         /* True if clients are currently paused */
 82     mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
 83     // 網路錯誤
 84     char neterr[ANET_ERR_LEN];   /* Error buffer for anet.c */
 85     // MIGRATE 快取
 86     dict *migrate_cached_sockets;/* MIGRATE cached sockets */
 87     uint64_t next_client_id;    /* Next client unique ID. Incremental. */
 88     int protected_mode;         /* Don't accept external connections. */
 89     /* RDB / AOF loading information */
 90     int loading;                /* We are loading data from disk if true */
 91     off_t loading_total_bytes;
 92     off_t loading_loaded_bytes;
 93     time_t loading_start_time;
 94     off_t loading_process_events_interval_bytes;
 95     /* Fast pointers to often looked up command */
 96     struct redisCommand *delCommand, *multiCommand, *lpushCommand,
 97                         *lpopCommand, *rpopCommand, *zpopminCommand,
 98                         *zpopmaxCommand, *sremCommand, *execCommand,
 99                         *expireCommand, *pexpireCommand, *xclaimCommand,
100                         *xgroupCommand;
101     /* Fields used only for stats */
102     time_t stat_starttime;          /* Server start time */
103     long long stat_numcommands;     /* Number of processed commands */
104     long long stat_numconnections;  /* Number of connections received */
105     long long stat_expiredkeys;     /* Number of expired keys */
106     double stat_expired_stale_perc; /* Percentage of keys probably expired */
107     long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/
108     long long stat_evictedkeys;     /* Number of evicted keys (maxmemory) */
109     long long stat_keyspace_hits;   /* Number of successful lookups of keys */
110     long long stat_keyspace_misses; /* Number of failed lookups of keys */
111     long long stat_active_defrag_hits;      /* number of allocations moved */
112     long long stat_active_defrag_misses;    /* number of allocations scanned but not moved */
113     long long stat_active_defrag_key_hits;  /* number of keys with moved allocations */
114     long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */
115     long long stat_active_defrag_scanned;   /* number of dictEntries scanned */
116     size_t stat_peak_memory;        /* Max used memory record */
117     long long stat_fork_time;       /* Time needed to perform latest fork() */
118     double stat_fork_rate;          /* Fork rate in GB/sec. */
119     long long stat_rejected_conn;   /* Clients rejected because of maxclients */
120     long long stat_sync_full;       /* Number of full resyncs with slaves. */
121     long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
122     long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */
123     list *slowlog;                  /* SLOWLOG list of commands */
124     long long slowlog_entry_id;     /* SLOWLOG current entry ID */
125     long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
126     unsigned long slowlog_max_len;     /* SLOWLOG max number of items logged */
127     struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
128     long long stat_net_input_bytes; /* Bytes read from network. */
129     long long stat_net_output_bytes; /* Bytes written to network. */
130     size_t stat_rdb_cow_bytes;      /* Copy on write bytes during RDB saving. */
131     size_t stat_aof_cow_bytes;      /* Copy on write bytes during AOF rewrite. */
132     /* The following two are used to track instantaneous metrics, like
133      * number of operations per second, network traffic. */
134     struct {
135         long long last_sample_time; /* Timestamp of last sample in ms */
136         long long last_sample_count;/* Count in last sample */
137         long long samples[STATS_METRIC_SAMPLES];
138         int idx;
139     } inst_metric[STATS_METRIC_COUNT];
140     /* Configuration */
141     int verbosity;                  /* Loglevel in redis.conf */
142     int maxidletime;                /* Client timeout in seconds */
143     int tcpkeepalive;               /* Set SO_KEEPALIVE if non-zero. */
144     int active_expire_enabled;      /* Can be disabled for testing purposes. */
145     int active_defrag_enabled;
146     size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
147     int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
148     int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
149     int active_defrag_cycle_min;       /* minimal effort for defrag in CPU percentage */
150     int active_defrag_cycle_max;       /* maximal effort for defrag in CPU percentage */
151     unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
152     size_t client_max_querybuf_len; /* Limit for client query buffer length */
153     int dbnum;                      /* Total number of configured DBs */
154     int supervised;                 /* 1 if supervised, 0 otherwise. */
155     int supervised_mode;            /* See SUPERVISED_* */
156     int daemonize;                  /* True if running as a daemon */
157     clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
158     /* AOF persistence */
159     // AOF 狀態(開啟/關閉/可寫)
160     int aof_state;                  /* AOF_(ON|OFF|WAIT_REWRITE) */
161     // 所使用的 fsync 策略(每個寫入/每秒/從不)
162     int aof_fsync;                  /* Kind of fsync() policy */
163     char *aof_filename;             /* Name of the AOF file */
164     int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */
165     int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
166     off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */
167     // 最後一次執行 BGREWRITEAOF 時, AOF 檔案的大小
168     off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */
169     // AOF 檔案的當前位元組大小
170     off_t aof_current_size;         /* AOF current size. */
171     int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
172     // 負責進行 AOF 重寫的子程序 ID
173     pid_t aof_child_pid;            /* PID if rewriting process */
174     // AOF 重寫快取連結串列,連結著多個快取塊
175     list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */
176     // AOF 緩衝區
177     sds aof_buf;      /* AOF buffer, written before entering the event loop */
178     // AOF 檔案的描述符
179     int aof_fd;       /* File descriptor of currently selected AOF file */
180     // AOF 的當前目標資料庫
181     int aof_selected_db; /* Currently selected DB in AOF */
182     // 推遲 write 操作的時間
183     time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
184     // 最後一直執行 fsync 的時間
185     time_t aof_last_fsync;            /* UNIX time of last fsync() */
186     time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */
187     // AOF 重寫的開始時間
188     time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */
189     // 最後一次執行 BGREWRITEAOF 的結果
190     int aof_lastbgrewrite_status;   /* C_OK or C_ERR */
191     // 記錄 AOF 的 write 操作被推遲了多少次
192     unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */
193     // 指示是否需要每寫入一定量的資料,就主動執行一次 fsync()
194     int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */
195     int rdb_save_incremental_fsync;   /* fsync incrementally while rdb saving? */
196     int aof_last_write_status;      /* C_OK or C_ERR */
197     int aof_last_write_errno;       /* Valid if aof_last_write_status is ERR */
198     int aof_load_truncated;         /* Don't stop on unexpected AOF EOF. */
199     int aof_use_rdb_preamble;       /* Use RDB preamble on AOF rewrites. */
200     /* AOF pipes used to communicate between parent and child during rewrite. */
201     int aof_pipe_write_data_to_child;
202     int aof_pipe_read_data_from_parent;
203     int aof_pipe_write_ack_to_parent;
204     int aof_pipe_read_ack_from_child;
205     int aof_pipe_write_ack_to_child;
206     int aof_pipe_read_ack_from_parent;
207     int aof_stop_sending_diff;     /* If true stop sending accumulated diffs
208                                       to child process. */
209     sds aof_child_diff;             /* AOF diff accumulator child side. */
210     /* RDB persistence */
211     // 自從上次 SAVE 執行以來,資料庫被修改的次數
212     long long dirty;                /* Changes to DB from the last save */
213     // BGSAVE 執行前的資料庫被修改次數
214     long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */
215     /**
216      * 負責執行 BGSAVE 的子程序的 ID
217      * 沒在執行 BGSAVE 時,設為 -1
218      */
219     pid_t rdb_child_pid;            /* PID of RDB saving child */
220     struct saveparam *saveparams;   /* Save points array for RDB */
221     int saveparamslen;              /* Number of saving points */
222     char *rdb_filename;             /* Name of RDB file */
223     int rdb_compression;            /* Use compression in RDB? */
224     int rdb_checksum;               /* Use RDB checksum? */
225     // 最後一次完成 SAVE 的時間
226     time_t lastsave;                /* Unix time of last successful save */
227     // 最後一次嘗試執行 BGSAVE 的時間
228     time_t lastbgsave_try;          /* Unix time of last attempted bgsave */
229     // 最近一次 BGSAVE 執行耗費的時間
230     time_t rdb_save_time_last;      /* Time used by last RDB save run. */
231     // 資料庫最近一次開始執行 BGSAVE 的時間
232     time_t rdb_save_time_start;     /* Current RDB save start time. */
233     int rdb_bgsave_scheduled;       /* BGSAVE when possible if true. */
234     int rdb_child_type;             /* Type of save by active child. */
235     // 最後一次執行 SAVE 的狀態
236     int lastbgsave_status;          /* C_OK or C_ERR */
237     int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */
238     int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
239     int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
240     /* Pipe and data structures for child -> parent info sharing. */
241     int child_info_pipe[2];         /* Pipe used to write the child_info_data. */
242     struct {
243         int process_type;           /* AOF or RDB child? */
244         size_t cow_size;            /* Copy on write size. */
245         unsigned long long magic;   /* Magic value to make sure data is valid. */
246     } child_info_data;
247     /* Propagation of commands in AOF / replication */
248     redisOpArray also_propagate;    /* Additional command to propagate. */
249     /* Logging */
250     char *logfile;                  /* Path of log file */
251     int syslog_enabled;             /* Is syslog enabled? */
252     char *syslog_ident;             /* Syslog ident */
253     int syslog_facility;            /* Syslog facility */
254     /* Replication (master) */
255     char replid[CONFIG_RUN_ID_SIZE+1];  /* My current replication ID. */
256     char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
257     // 全域性複製偏移量(一個累計值)
258     long long master_repl_offset;   /* My current replication offset */
259     long long second_replid_offset; /* Accept offsets up to this for replid2. */
260     int slaveseldb;                 /* Last SELECTed DB in replication output */
261     // 主伺服器傳送 PING 的頻率
262     int repl_ping_slave_period;     /* Master pings the slave every N seconds */
263     // backlog 本身
264     char *repl_backlog;             /* Replication backlog for partial syncs */
265     // backlog 的長度
266     long long repl_backlog_size;    /* Backlog circular buffer size */
267     // backlog 中資料的長度
268     long long repl_backlog_histlen; /* Backlog actual data length */
269     // backlog 的當前索引
270     long long repl_backlog_idx;     /* Backlog circular buffer current offset,
271                                        that is the next byte will'll write to.*/
272     // backlog 中可以被還原的第一個位元組的偏移量
273     long long repl_backlog_off;     /* Replication "master offset" of first
274                                        byte in the replication backlog buffer.*/
275     // backlog 的過期時間
276     time_t repl_backlog_time_limit; /* Time without slaves after the backlog
277                                        gets released. */
278     // 距離上一次有從伺服器的時間
279     time_t repl_no_slaves_since;    /* We have no slaves since that time.
280                                        Only valid if server.slaves len is 0. */
281     // 是否開啟最小數量從伺服器寫入功能
282     int repl_min_slaves_to_write;   /* Min number of slaves to write. */
283     // 定義最小數量從伺服器的最大延遲值
284     int repl_min_slaves_max_lag;    /* Max lag of <count> slaves to write. */
285     // 延遲良好的從伺服器的數量
286     int repl_good_slaves_count;     /* Number of slaves with lag <= max_lag. */
287     int repl_diskless_sync;         /* Send RDB to slaves sockets directly. */
288     int repl_diskless_sync_delay;   /* Delay to start a diskless repl BGSAVE. */
289     /* Replication (slave) */
290     // 主伺服器的驗證密碼
291     char *masterauth;               /* AUTH with this password with master */
292     // 主伺服器的地址
293     char *masterhost;               /* Hostname of master */
294     // 主伺服器的埠
295     int masterport;                 /* Port of master */
296     // 超時時間
297     int repl_timeout;               /* Timeout after N seconds of master idle */
298     // 主伺服器所對應的客戶端
299     redisClient *master;     /* Client that is master for this slave */
300     // 被快取的主伺服器,PSYNC 時使用
301     redisClient *cached_master; /* Cached master to be reused for PSYNC. */
302     int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
303     // 複製的狀態(伺服器是從伺服器時使用)
304     int repl_state;          /* Replication status if the instance is a slave */
305     // RDB 檔案的大小
306     off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
307     // 已讀 RDB 檔案內容的位元組數
308     off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
309     // 最近一次執行 fsync 時的偏移量
310     // 用於 sync_file_range 函式
311     off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
312     // 主伺服器的套接字
313     int repl_transfer_s;     /* Slave -> Master SYNC socket */
314     // 儲存 RDB 檔案的臨時檔案的描述符
315     int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
316     // 儲存 RDB 檔案的臨時檔名字
317     char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
318     // 最近一次讀入 RDB 內容的時間
319     time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
320     int repl_serve_stale_data; /* Serve stale data when link is down? */
321     // 是否只讀從伺服器?
322     int repl_slave_ro;          /* Slave is read only? */
323     int repl_slave_ignore_maxmemory;    /* If true slaves do not evict. */
324     // 連線斷開的時長
325     time_t repl_down_since; /* Unix time at which link with master went down */
326     // 是否要在 SYNC 之後關閉 NODELAY ?
327     int repl_disable_tcp_nodelay;   /* Disable TCP_NODELAY after SYNC? */
328     // 從伺服器優先順序
329     int slave_priority;             /* Reported in INFO and used by Sentinel. */
330     int slave_announce_port;        /* Give the master this listening port. */
331     char *slave_announce_ip;        /* Give the master this ip address. */
332     /* The following two fields is where we store master PSYNC replid/offset
333      * while the PSYNC is in progress. At the end we'll copy the fields into
334      * the server->master client structure. */
335     // 本伺服器(從伺服器)當前主伺服器的 RUN ID
336     char master_replid[CONFIG_RUN_ID_SIZE+1];  /* Master PSYNC runid. */
337     // 初始化偏移量
338     long long master_initial_offset;           /* Master PSYNC offset. */
339     int repl_slave_lazy_flush;          /* Lazy FLUSHALL before loading DB? */
340     /* Replication script cache. */
341     // 複製指令碼快取
342     // 字典
343     dict *repl_scriptcache_dict;        /* SHA1 all slaves are aware of. */
344     // FIFO 佇列
345     list *repl_scriptcache_fifo;        /* First in, first out LRU eviction. */
346     // 快取的大小
347     int repl_scriptcache_size;          /* Max number of elements. */
348     /* Synchronous replication. */
349     list *clients_waiting_acks;         /* Clients waiting in WAIT command. */
350     int get_ack_from_slaves;            /* If true we send REPLCONF GETACK. */
351     /* Limits */
352     unsigned int maxclients;            /* Max number of simultaneous clients */
353     unsigned long long maxmemory;   /* Max number of memory bytes to use */
354     int maxmemory_policy;           /* Policy for key eviction */
355     int maxmemory_samples;          /* Pricision of random sampling */
356     int lfu_log_factor;             /* LFU logarithmic counter factor. */
357     int lfu_decay_time;             /* LFU counter decay factor. */
358     long long proto_max_bulk_len;   /* Protocol bulk length maximum size. */
359     /* Blocked clients */
360     unsigned int blocked_clients;   /* # of clients executing a blocking cmd.*/
361     unsigned int blocked_clients_by_type[BLOCKED_NUM];
362     list *unblocked_clients; /* list of clients to unblock before next loop */
363     list *ready_keys;        /* List of readyList structures for BLPOP & co */
364     /* Sort parameters - qsort_r() is only available under BSD so we
365      * have to take this state global, in order to pass it to sortCompare() */
366     int sort_desc;
367     int sort_alpha;
368     int sort_bypattern;
369     int sort_store;
370     /* Zip structure config, see redis.conf for more information  */
371     size_t hash_max_ziplist_entries;
372     size_t hash_max_ziplist_value;
373     size_t set_max_intset_entries;
374     size_t zset_max_ziplist_entries;
375     size_t zset_max_ziplist_value;
376     size_t hll_sparse_max_bytes;
377     size_t stream_node_max_bytes;
378     int64_t stream_node_max_entries;
379     /* List parameters */
380     int list_max_ziplist_size;
381     int list_compress_depth;
382     /* time cache */
383     time_t unixtime;    /* Unix time sampled every cron cycle. */
384     time_t timezone;    /* Cached timezone. As set by tzset(). */
385     int daylight_active;    /* Currently in daylight saving time. */
386     long long mstime;   /* Like 'unixtime' but with milliseconds resolution. */
387     /* Pubsub */
388     // 字典,鍵為頻道,值為連結串列
389     // 連結串列中儲存了所有訂閱某個頻道的客戶端
390     // 新客戶端總是被新增到連結串列的表尾
391     dict *pubsub_channels;  /* Map channels to list of subscribed clients */
392     // 這個連結串列記錄了客戶端訂閱的所有模式的名字
393     list *pubsub_patterns;  /* A list of pubsub_patterns */
394     int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
395                                    xor of NOTIFY_... flags. */
396     /* Cluster */
397     int cluster_enabled;      /* Is cluster enabled? */
398     mstime_t cluster_node_timeout; /* Cluster node timeout. */
399     char *cluster_configfile; /* Cluster auto-generated config file name. */
400     struct clusterState *cluster;  /* State of the cluster */
401     int cluster_migration_barrier; /* Cluster replicas migration barrier. */
402     int cluster_slave_validity_factor; /* Slave max data age for failover. */
403     int cluster_require_full_coverage; /* If true, put the cluster down if
404                                           there is at least an uncovered slot.*/
405     int cluster_slave_no_failover;  /* Prevent slave from starting a failover
406                                        if the master is in failure state. */
407     char *cluster_announce_ip;  /* IP address to announce on cluster bus. */
408     int cluster_announce_port;     /* base port to announce on cluster bus. */
409     int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
410     int cluster_module_flags;      /* Set of flags that Redis modules are able
411                                       to set in order to suppress certain
412                                       native Redis Cluster features. Check the
413                                       REDISMODULE_CLUSTER_FLAG_*. */
414     /* Scripting */
415     // Lua 環境
416     lua_State *lua; /* The Lua interpreter. We use just one for all clients */
417     // 複製執行 Lua 指令碼中的 Redis 命令的偽客戶端
418     redisClient *lua_client;   /* The "fake client" to query Redis from Lua */
419     // 當前正在執行 EVAL 命令的客戶端,如果沒有就是 NULL
420     redisClient *lua_caller;   /* The client running EVAL right now, or NULL */
421     // 一個字典,值為 Lua 指令碼,鍵為指令碼的 SHA1 校驗和
422     dict *lua_scripts;         /* A dictionary of SHA1 -> Lua scripts */
423     unsigned long long lua_scripts_mem;  /* Cached scripts' memory + oh */
424     // Lua 指令碼的執行時限
425     mstime_t lua_time_limit;  /* Script timeout in milliseconds */
426     // 指令碼開始執行的時間
427     mstime_t lua_time_start;  /* Start time of script, milliseconds time */
428     // 指令碼是否執行過寫命令
429     int lua_write_dirty;  /* True if a write command was called during the
430                              execution of the current script. */
431     // 指令碼是否執行過帶有隨機性質的命令
432     int lua_random_dirty; /* True if a random command was called during the
433                              execution of the current script. */
434     int lua_replicate_commands; /* True if we are doing single commands repl. */
435     int lua_multi_emitted;/* True if we already proagated MULTI. */
436     int lua_repl;         /* Script replication flags for redis.set_repl(). */
437     // 指令碼是否超時
438     int lua_timedout;     /* True if we reached the time limit for script
439                              execution. */
440     // 是否要殺死指令碼
441     int lua_kill;         /* Kill the script if true. */
442     int lua_always_replicate_commands; /* Default replication type. */
443     /* Lazy free */
444     int lazyfree_lazy_eviction;
445     int lazyfree_lazy_expire;
446     int lazyfree_lazy_server_del;
447     /* Latency monitor */
448     long long latency_monitor_threshold;
449     dict *latency_events;
450     /* Assert & bug reporting */
451     const char *assert_failed;
452     const char *assert_file;
453     int assert_line;
454     int bug_report_start; /* True if bug report header was already logged. */
455     int watchdog_period;  /* Software watchdog period in ms. 0 = off */
456     /* System hardware info */
457     size_t system_memory_size;  /* Total memory in system as reported by OS */
458 
459     /* Mutexes used to protect atomic variables when atomic builtins are
460      * not available. */
461     pthread_mutex_t lruclock_mutex;
462     pthread_mutex_t next_client_id_mutex;
463     pthread_mutex_t unixtime_mutex;
464 };

設定非同步釋放客戶端的目的主要是:防止底層函式正在向客戶端的輸出緩衝區寫資料的時候,關閉客戶端,這樣是不安全的。Redis會安排客戶端在serverCron()函式的安全時間釋放它。

當然也可以取消非同步釋放,那麼就會呼叫freeClient()函式立即釋放。原始碼如下:

 1 // 取消設定非同步釋放的client
 2 void freeClientsInAsyncFreeQueue(void) {
 3     // 遍歷所有即將關閉的client
 4     while (listLength(server.clients_to_close)) {
 5         listNode *ln = listFirst(server.clients_to_close);
 6         client *c = listNodeValue(ln);
 7 
 8         // 取消立即關閉的標誌
 9         c->flags &= ~CLIENT_CLOSE_ASAP;
10         //釋放客戶端
11         freeClient(c);
12         // 從即將關閉的client連結串列中刪除
13         listDelNode(server.clients_to_close,ln);
14     }
15 }

兩種關閉客戶端方式的底層都是呼叫freeClient,原始碼如下

 1 void freeClient(client *c) {
 2     listNode *ln;
 3 
 4     /* If a client is protected, yet we need to free it right now, make sure
 5      * to at least use asynchronous freeing. */
 6     if (c->flags & CLIENT_PROTECTED) {
 7         freeClientAsync(c);
 8         return;
 9     }
10 
11     /* If it is our master that's beging disconnected we should make sure
12      * to cache the state to try a partial resynchronization later.
13      *
14      * Note that before doing this we make sure that the client is not in
15      * some unexpected state, by checking its flags. */
16     if (server.master && c->flags & CLIENT_MASTER) {
17         serverLog(LL_WARNING,"Connection with master lost.");
18         if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
19                           CLIENT_CLOSE_ASAP|
20                           CLIENT_BLOCKED)))
21         {
22             replicationCacheMaster(c);
23             return;
24         }
25     }
26 
27     /* Log link disconnection with slave */
28     if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
29         serverLog(LL_WARNING,"Connection with replica %s lost.",
30             replicationGetSlaveName(c));
31     }
32 
33     /* Free the query buffer */
34     sdsfree(c->querybuf);
35     sdsfree(c->pending_querybuf);
36     c->querybuf = NULL;
37 
38     /* Deallocate structures used to block on blocking ops. */
39     if (c->flags & CLIENT_BLOCKED) unblockClient(c);
40     dictRelease(c->bpop.keys);
41 
42     /* UNWATCH all the keys */
43     unwatchAllKeys(c);
44     listRelease(c->watched_keys);
45 
46     /* Unsubscribe from all the pubsub channels */
47     pubsubUnsubscribeAllChannels(c,0);
48     pubsubUnsubscribeAllPatterns(c,0);
49     dictRelease(c->pubsub_channels);
50     listRelease(c->pubsub_patterns);
51 
52     /* Free data structures. */
53     listRelease(c->reply);
54     freeClientArgv(c);
55 
56     /* Unlink the client: this will close the socket, remove the I/O
57      * handlers, and remove references of the client from different
58      * places where active clients may be referenced. */
59     unlinkClient(c);
60 
61     /* Master/slave cleanup Case 1:
62      * we lost the connection with a slave. */
63     if (c->flags & CLIENT_SLAVE) {
64         if (c->replstate == SLAVE_STATE_SEND_BULK) {
65             if (c->repldbfd != -1) close(c->repldbfd);
66             if (c->replpreamble) sdsfree(c->replpreamble);
67         }
68         list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
69         ln = listSearchKey(l,c);
70         serverAssert(ln != NULL);
71         listDelNode(l,ln);
72         /* We need to remember the time when we started to have zero
73          * attached slaves, as after some time we'll free the replication
74          * backlog. */
75         if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
76             server.repl_no_slaves_since = server.unixtime;
77         refreshGoodSlavesCount();
78     }
79 
80     /* Master/slave cleanup Case 2:
81      * we lost the connection with the master. */
82     if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
83 
84     /* If this client was scheduled for async freeing we need to remove it
85      * from the queue. */
86     if (c->flags & CLIENT_CLOSE_ASAP) {
87         ln = listSearchKey(server.clients_to_close,c);
88         serverAssert(ln != NULL);
89         listDelNode(server.clients_to_close,ln);
90     }
91 
92     /* Release other dynamically allocated client structure fields,
93      * and finally release the client structure itself. */
94     if (c->name) decrRefCount(c->name);
95     zfree(c->argv);
96     freeClientMultiState(c);
97     sdsfree(c->peerid);
98     zfree(c);
99 }
View Code