1. 程式人生 > >Redis原始碼剖析(五)訂閱與釋出

Redis原始碼剖析(五)訂閱與釋出

Redis提供了訂閱和釋出的功能,允許客戶端訂閱一個或多個頻道,當其他客戶端向某個頻道傳送訊息時,伺服器會將訊息轉發給所有訂閱該頻道的客戶端

這一點有點像群聊的功能,一個客戶端將訊息發往群中(向某個頻道傳送訊息),所有在群中的客戶端(訂閱該頻道的客戶端)都會收到這個訊息。事實也正是如此,接下來將會看到,伺服器採用字典儲存每個頻道(鍵)和訂閱該頻道的所有客戶端(值),每當其他客戶端向某個頻道傳送訊息時,伺服器便從字典中獲取所有訂閱該頻道的客戶端,依次將訊息傳送。每個頻道,可以看成是每個群,一個頻道的所有訂閱客戶端,可以看成該群的所有群成員,唯一不同的是,向頻道傳送訊息的那個客戶端並不需要訂閱同樣的頻道,也就是該客戶端並不需要也在群中

稍後會看到,除了訂閱特定頻道,Redis也允許客戶端進行模式訂閱,即一次訂閱所有匹配的頻道

Redis的訂閱與釋出功能由PUBLISH, SUBSCRIBE, PSUBSCRIBE等命令組成

普通訂閱

使用SUBSCRIBE [頻道名]即可訂閱特定頻道,頻道名可以自定義,也可以同時訂閱多個頻道,只需要後面新增多個頻道名即可

127.0.0.1:6379> SUBSCRIBE "news.redis"  //訂閱"news.redis"頻道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"  //命令關鍵字
2) "news.redis"
//頻道名 3) (integer) 1 //訂閱該頻道的客戶端數量

使用PUBLISH [頻道名] [訊息]即可向特定頻道傳送訊息

127.0.0.1:6379> PUBLISH "news.redis" "send a message"   //向"news.redis"頻道傳送訊息
(integer) 1 //返回傳送給了多少個客戶端
127.0.0.1:6379> 

此時,如果再檢視訂閱news.redis頻道的那個客戶端,會發現終端上打印出”send a message”資訊

//PUBLISH之前
127.0.0.1:6379> SUBSCRIBE "news.redis"
Reading messages... (press Ctrl-C to quit)
1
) "subscribe" 2) "news.redis" 3) (integer) 1 //PUBLISH之後 1) "message" //訊息型別 2) "news.redis" //頻道名 3) "send a message" //資訊

不過如果一個客戶端處於訂閱狀態,它好像就不能執行其他操作了

儲存結構

實現一個訂閱與釋出功能十分簡單,開篇也提到了,只需要將每個頻道以及它的訂閱者記錄在字典中,如果客戶端向某個頻道傳送訊息,則在字典中查詢該頻道的所有訂閱者,依次將訊息傳送過去即可。

在深入原始碼之前,先看兩個結構的定義,一個是客戶端,一個是伺服器,它們都定義在server.h標頭檔案中

//server.h
typedef struct client {
    ...
    dict *pubsub_channels; 
    ...
} client;
//server.h
struct redisServer {
    ...
    dict *pubsub_channels;  
    ...
};

這兩個結構都太長了,不過目前用得到的其實就一個pubsub_channels變數,根據型別得知該變數是一個字典(以下簡稱為訂閱字典),兩個變數的作用分別是

  • 客戶端的訂閱字典記錄著當前客戶端訂閱的所有頻道,鍵是頻道名,值為空
  • 伺服器的訂閱字典記錄著所有頻道以及每個頻道的訂閱者,鍵是頻道名,值是客戶端連結串列

到這裡其實可以簡單猜測訂閱功能是如何實現的,當某個客戶端使用SUBSCRIBE命令訂閱一個或多個頻道時,Redis會將<頻道名,客戶端>這個鍵值對新增到伺服器的訂閱字典中,同時也會將頻道名新增到客戶端自己的訂閱字典中

而當客戶端使用PUBLISH命令向某個頻道傳送訊息時,Redis會在訂閱字典中獲取該頻道的所有訂閱者(客戶端),依次將訊息傳送給客戶端。如果該頻道不存在或沒有訂閱者,則不執行任何操作

訂閱功能

訂閱功能由subscribeCommand函式完成,函式主要任務是遍歷每一引數(頻道名),呼叫pubsubSubscribeChannel函式將頻道名和客戶端新增到訂閱字典中

//pubsub.c
/* 訂閱命令 */
void subscribeCommand(client *c) {
    int j;

    /* 將客戶端和它訂閱的頻道進行關聯,新增到訂閱字典中
     * 鍵是頻道名,值是客戶端 
     */
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    /* 標記當前客戶端訂閱過某些頻道 */
    c->flags |= CLIENT_PUBSUB;
}

pubsubSubscribeChannel函式完成實際的新增操作

//pubsub.c
/* 
 * 將客戶端和它訂閱的頻道進行關聯,新增到客戶端和伺服器兩個訂閱字典中 
 * 
 * 注:伺服器和客戶端都有訂閱字典,分別是
 * c->pubsub_channels
 * server.pubsub_channels
 */
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* 判斷當前客戶端是否已經訂閱了該頻道,如果是則不進行處理,否則新增到客戶端的訂閱字典中 */
    /* 注意這裡新增的是客戶端的訂閱字典,該字典記錄當前客戶端訂閱的所有頻道 */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        /* 所有的robj物件都是基於引用計數的,因為已將其新增到字典中,所有引用計數加一 */
        incrRefCount(channel);
        /* 從伺服器的訂閱字典中尋找該頻道對應的鍵節點連結串列(記錄所有訂閱該頻道的客戶端連結串列) */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            /* 伺服器訂閱字典中沒有關於該頻道的記錄,建立該頻道對應的客戶端連結串列 */
            clients = listCreate();
            /* 將<頻道,客戶端連結串列>新增到伺服器的訂閱字典中 */
            dictAdd(server.pubsub_channels,channel,clients);
            /* 頻道的引用計數加一 ,因為在字典中也有一份*/
            incrRefCount(channel);
        } else {
            /* 伺服器訂閱字典中有關於該頻道的記錄,直接將客戶端連結串列返回 */ 
            clients = dictGetVal(de);
        }
        /* 將當前客戶端連線到連結串列上 */
        listAddNodeTail(clients,c);
    }
    /* 通知客戶端訂閱成功 */ 
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

至此訂閱操作完成,可以發現訂閱僅僅是將頻道名和客戶端這個鍵值對新增到訂閱字典中,並不執行其他操作。

退訂功能

有訂閱就有退訂,退訂命令是UNSUBSCRIBE,有unsubscribeCommand函式執行。不過既然訂閱功能是阻塞的,怎麼執行退訂啊…

退訂分兩種,一種是退訂當前客戶端訂閱的所有頻道,此時退訂命令不帶引數。另一種則帶引數,僅退訂引數指出的頻道

//pubsub.c
/* 退訂命令 */
void unsubscribeCommand(client *c) {
    if (c->argc == 1) {
        /* 退訂當前客戶端訂閱的所有頻道 */
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;

        /* 退訂引數指出的頻道 */
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
    /* 客戶端訂閱的頻道數為0時,改變標誌 */
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

退訂所有頻道是遍歷當前客戶端的訂閱字典,對訂閱的每個頻道呼叫pubsubUnsubscribeChannel函式,實際上和指定引數效果相同,所以就直接看退訂引數指定頻道的函式好了

//pubsub.c
/* 
 * 退訂
 * c : 客戶端
 * channel : 要退訂的頻道
 * notify : 退訂後是否通知客戶端
 */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    incrRefCount(channel); /* channel may be just a pointer to the same object
                            we have in the hash tables. Protect it... */
    /* 從客戶端訂閱字典中刪除關於該頻道的訂閱資訊 */
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        /* 刪除成功,表示這個客戶端訂閱過channel */
        retval = 1;

        /* 從伺服器訂閱字典中查詢關於該頻道的所有訂閱資訊,返回鍵節點 */
        de = dictFind(server.pubsub_channels,channel);
        serverAssertWithInfo(c,NULL,de != NULL);
        /* 從鍵節點中獲取客戶端連結串列 */
        clients = dictGetVal(de);
        /* 從客戶端連結串列中搜索當前退訂的客戶端 */
        ln = listSearchKey(clients,c);
        serverAssertWithInfo(c,NULL,ln != NULL);
        /* 將連結串列節點ln從連結串列clients中刪除 */
        listDelNode(clients,ln);
        /* 如果該頻道只有該客戶端訂閱過,那麼刪除後客戶端連結串列為空,從伺服器訂閱字典中刪除該頻道的資訊 */
        if (listLength(clients) == 0) {
            dictDelete(server.pubsub_channels,channel);
        }
    }
    /* 如果要求通知,則通知客戶端 */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }
    decrRefCount(channel); /* it is finally safe to release it */
    return retval;
}

退訂函式雖然長了點,但是還是蠻好理解的,僅僅是將客戶端和頻道的關聯資訊從訂閱字典中刪除

普通訂閱的資訊釋出

Redis的釋出功能由PUBLISH命令實現,底層由pubsubPublishMessage函式實現,該函式向訂閱特定頻道的所有客戶端傳送訊息。訂閱分兩種,一個是普通訂閱(如上),另一個是模式訂閱,所以函式中也分為向普通訂閱的客戶端傳送訊息和向模式訂閱的客戶端傳送訊息。因為還沒有接觸模式訂閱,所以先看普通訂閱的釋出好了

普通訂閱的傳送訊息僅僅是在伺服器的訂閱字典中尋找特定頻道的所有訂閱者,依次將訊息傳送就完成了,比較簡單

//pubsub.c
/* 傳送通知資訊 */
/* 
 * channel : 通知資訊
 * message : 事件名稱
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* 伺服器的訂閱字典儲存著所有頻道和它的所有訂閱者 */
    /* 從該字典中查詢頻道channel的訂閱者,返回鍵節點 */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        /* 從鍵節點中獲取訂閱該頻道的客戶端連結串列 */
        list *list = dictGetVal(de); 
        listNode *ln;
        listIter li;

        /* 將迭代器方向設定為從頭到尾 */
        listRewind(list,&li);
        /* 遍歷客戶端連結串列的所有客戶端,傳送通知資訊 */
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    ...
    return receivers;
}

模式訂閱

Redis允許客戶端使用正則表示式訂閱一組頻道,命令格式為PSUBSCRIBE [頻道名]

127.0.0.1:6379> PSUBSCRIBE "news.redi[xy]"  //訂閱"news.redix"和"news.rediy"兩個頻道
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" //命令關鍵字
2) "news.redi[xy]"  //頻道名
3) (integer) 1  //訂閱該頻道的客戶端數量

此時,如果開啟另一個客戶端,不管是向news.redix頻道傳送還是向news.rediy頻道傳送訊息,上面這個客戶端都會接收到訊息

127.0.0.1:6379> PUBLISH "news.redix" "send to news.redix"   //向"news.redix"頻道傳送訊息
(integer) 1
127.0.0.1:6379> PUBLISH "news.rediy" "send to news.rediy"   //向"news.rediy"頻道傳送訊息
(integer) 1
127.0.0.1:6379> 
127.0.0.1:6379> PSUBSCRIBE "news.redi[xy]"  //訂閱"news.redix"和"news.rediy"兩個頻道
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.redi[xy]"
3) (integer) 1
1) "pmessage"       //從頻道news.redix接收到訊息
2) "news.redi[xy]"  
3) "news.redix"     //頻道名
4) "send to news.redix" //訊息內容
1) "pmessage"       //從頻道news.rediy接收到訊息
2) "news.redi[xy]"
3) "news.rediy"     //頻道名
4) "send to news.rediy" //訊息內容

以下將用正則表示式表示的頻道稱為頻道組,如”news.redi[xy]”就是一個頻道組

儲存結構

由於模式訂閱的頻道名代表一組頻道,所以不能用字典儲存,因為字典的鍵是已知的,當然可以將用正則表示式代表的頻道的所有可能都計算處然後新增到字典中,不過Redis不會這麼做,論誰誰都不會,因為結果集太大了。

所以字典在這裡算是沒有用武之地了,Redis採用連結串列將每個客戶端和它訂閱的頻道組記錄起來,每當向特定頻道釋出訊息時,Redis就會遍歷這個連結串列判斷每個客戶端的頻道組是否可以和當前頻道匹配,如果匹配則向該客戶端傳送訊息。當然,每個客戶端也有這麼個連結串列記錄自己訂閱的頻道組,在它們的定義中可以清楚的看到

//server.h
typedef struct client {
    dict *pubsub_channels;  //訂閱字典
    list *pubsub_patterns;  //模式訂閱連結串列
} client;
//server.h
struct redisServer {
    dict *pubsub_channels;  //訂閱字典
    list *pubsub_patterns;  //模式訂閱連結串列
};

與訂閱字典相同,模式訂閱連結串列在客戶端和伺服器的作用也不相同

  • 客戶端的模式訂閱連結串列儲存當前客戶端訂閱的所有頻道組
  • 伺服器的模式訂閱連結串列儲存所有客戶端訂閱的所有頻道組(連結串列中可能有多個節點指向的客戶端相同,但是頻道組不同)

客戶端連結串列的節點儲存的是頻道組

伺服器連結串列的節點儲存的結構是pubsubPattern型別,該結構記錄著客戶端和一個頻道組

//server.h
typedef struct pubsubPattern {
    client *client; //客戶端
    robj *pattern;  //頻道組
} pubsubPattern;

有了訂閱模組的基礎,到這裡可以猜測模式訂閱也僅僅是將客戶端和其模式訂閱的頻道組組成pubsubPattern新增到伺服器的模式訂閱連結串列中,將頻道組新增到客戶端的模式訂閱連結串列中,並不做其他處理

模式訂閱功能

事實也正是如此,模式訂閱功能由pubsubSubscribePattern函式實現

//pubsub.c
/* 模式訂閱 */
int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;

    /* 從客戶端的模式訂閱連結串列中查詢要訂閱的模式,如果不存在,才進行新增 */
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        /* pubsubPattern結構記錄著客戶端c和頻道組pattern */
        pubsubPattern *pat;
        /* 將頻道組新增到客戶端模式訂閱連結串列尾部 */
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        /* 申請空間,組裝pubsubPattern結構 */
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        /* 將訂閱節點新增到伺服器的訂閱連結串列中 */
        listAddNodeTail(server.pubsub_patterns,pat);
        /* 因為客戶端的訂閱連結串列只需要記錄自己訂閱頻道組即可,所以無需儲存pubsubPattern結構
         * 而伺服器需要記錄每個客戶端和其頻道組,二者都需要記錄,所以儲存pubsubPattern結構 */
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

模式退訂功能

退訂和訂閱是相反的,對於模式訂閱的退訂也是如此,僅僅是將頻道組從模式訂閱連結串列中刪除,需要注意的是要退訂就退訂整個頻道組,Redis不支援將特定頻道從頻道組中去除

//pubsub.c
/* 退訂模式 */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
    listNode *ln;
    pubsubPattern pat;
    int retval = 0;

    incrRefCount(pattern); /* Protect the object. May be the same we remove */
    /* 從客戶端自己的模式訂閱連結串列中查詢相應模式 */
    if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
        retval = 1;
        /* 如果找到,則刪除連結串列節點 */
        listDelNode(c->pubsub_patterns,ln);
        pat.client = c;
        pat.pattern = pattern;
        /* 從伺服器的模式訂閱連結串列中查詢,然後刪除 */
        ln = listSearchKey(server.pubsub_patterns,&pat);
        listDelNode(server.pubsub_patterns,ln);
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.punsubscribebulk);
        addReplyBulk(c,pattern);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    decrRefCount(pattern);
    return retval;
}

模式訂閱的資訊釋出

最後一個就是關於向模式訂閱釋出訊息的實現了,在上面訂閱模組處,僅僅看到了pubsubPublishMessage函式向訂閱特定頻道的客戶端傳送訊息。而實際上,它還有一部分是向模式訂閱的客戶端傳送訊息,方法是遍歷模式訂閱連結串列,對於每一個節點判斷其頻道組是否和當前頻道匹配,如果匹配,則向客戶端傳送訊息

//pubsub.c
/* 傳送通知資訊 */
/* 
 * channel : 通知資訊
 * message : 事件名稱
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    ... //這裡省略普通訂閱的釋出功能

    /* 查詢模式訂閱的客戶端 */
    /* 這裡就體現了為什麼訂閱頻道和客戶端是用字典儲存,而模式訂閱則用連結串列儲存 
     * 因為訂閱可以直接使用雜湊表定位,而模式訂閱類似正則匹配,需要判斷當前的頻道是否
     * 是匹配的模式訂閱,然後傳送給訂閱者,雜湊表在這裡是沒有作用的 */
    if (listLength(server.pubsub_patterns)) {
        /* 迭代器方向設定為從頭到尾 */
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        /* 遍歷伺服器的模式訂閱連結串列 */
        while ((ln = listNext(&li)) != NULL) {
            /* 獲取每個節點的頻道組 */
            pubsubPattern *pat = ln->value;

            /* 判斷頻道組是否和當前頻道匹配,如果匹配,則傳送通知資訊 */
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

可以看到,對於模式訂閱,Redis會使用stringmatchlen函式進行正則匹配,如果匹配成功,說明該客戶端關注的頻道組中包含當前頻道,那麼就需要將訊息傳送給客戶端

小結

本篇注意是對Redis訂閱與釋出功能的分析,原始碼比較簡單,對於訂閱功能,僅僅是將客戶端和頻道名(組)記錄在某個資料結構中,當有其他客戶端向某個頻道執行釋出功能時,檢查資料結構中那些訂閱了該頻道的客戶端,並向其傳送訊息