redis的訊息釋出(publish)/ 訂閱(subscribe)
最近專案需要一個可以實現釋出/訂閱 機制的訊息佇列, 首先想到的是Kafka, RabbitMQ之類的訊息佇列元件, 但是感覺我們的專案也許不需要引入那麼複雜的元件, Redis也有一個比較輕量的訂閱機制。 我們可以參考redis的 Publish/Subscribe 機制, 來得到比較好的問題解決方案。
publish/subscribe的用法
redis提供瞭如下6個命令來支援該功能:
序號 | 命令 | 描述 |
---|---|---|
1 | PSUBSCRIBE pattern [pattern …] | 訂閱一個或多個符合給定模式的頻道 |
2 | PUBSUB subcommand [argument [argument …]] | 檢視訂閱與釋出系統狀態 |
3 | PUBLISH channel message | 將訊息傳送到指定的頻道 |
4 | PUNSUBSCRIBE [pattern [pattern …]] | 退訂所有給定模式的頻道 |
5 | SUBSCRIBE channel [channel …] | 訂閱給定的一個或多個頻道的資訊 |
6 | UNSUBSCRIBE [channel [channel …]] | 指退訂給定的頻道 |
- 客戶端可以一次性訂閱一個或者多個channel,SUBSCRIBE channel1 channel2 channel3;
- PUBSUB返回當前publish/subscribe 系統的內部命令的活動狀態, 內部命令包括:channels(列出當前活躍的channel),NUMSUB(返回指定channel的訂閱數目),NUMPAT(返回訂閱pattern的訂閱數);
- 訂閱多個channel,萬用字元 * 可以匹配上面所有的channel, PSUBSCRIBE chan* ;
- 訊息釋出,PUBLISH channel2 hello-test;
- 取消某一個channel訊息訂閱, UNSUBSCRIBE channel1;
- 取消某個pattern的訊息訂閱, PUNSUBSCRIBE chan* ;
publish/subscribe 的實現程式碼分析
基本上所有的程式碼都在pubsub.c裡面, 都是通郭一個字典和一個連結串列來實現的, 字典裡面包含了從一個channel名字, 關聯到channel對應的訂閱clients; 對於pattern模式的訂閱, 使用了連結串列來儲存所有的pattern, 以及pattern對應的訂閱者。
struct redisClient {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
}
struct redisServer {
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
}
subscribe 實現
現將channel新增都redisClient.pubsub_channels字典內, 然後去RedisServer.pubsub_channels字典內驅查詢, 如果沒有該channel, 就新增一個到字典裡面, 如果已經存在, 返回當前的值;最後返回客戶端;
void subscribeCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
}
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
unsubscribe
將channel在RedisClient.pubsub_channels字典內的KV對刪除, 用channel去redisServer.pubsub_channels 字典內查詢該channel對應的clients的連結串列ln, 將指定的clients刪除掉, 如果ln內的所有元素都被刪除了, 就刪除該字典內的KV對。
void unsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
}
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
struct dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
psubscribe 與punsubscribe
這兩個與subscribe, unsubscribe的實現基本相同, 不同地方主要是其對應的KV對存在pubsub_patterns 連結串列裡面;
publish
publish的實現是遍歷server端的pubsub_channels 字典以及pubsub_patterns連結串列, 將message傳送給他們對應的client;
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
pubsub
pubsub主要是檢視訂閱-釋出系統的內部活動的狀態, 相當於一個該系統的一個統計命令, 透過它, 使用者可以檢查當前的系統的釋出訂閱的狀況。
void pubsubCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"channels") &&
(c->argc == 2 || c->argc ==3))
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
dictIterator *di = dictGetIterator(server.pubsub_channels);
dictEntry *de;
long mblen = 0;
void *replylen;
replylen = addDeferredMultiBulkLength(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
if (!pat || stringmatchlen(pat, sdslen(pat),
channel, sdslen(channel),0))
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,mblen);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
addReplyMultiBulkLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyLongLong(c,l ? listLength(l) : 0);
}
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
} else {
addReplyErrorFormat(c,
"Unknown PUBSUB subcommand or wrong number of arguments for '%s'",
(char*)c->argv[1]->ptr);
}
}
總結
redis的Publish/Subscribe機制, 能夠比較方便實現將訊息從一個客戶端傳遞到一個或者幾個客戶端的基本功能, 但是從上面的程式碼也能看出來, 它存在著一些問題:
- 無法保證釋出的訊息一定能被訂閱者收到;
- 重啟之後, 在重啟過程中的訊息丟失;