1. 程式人生 > >redis資料庫之主從複製

redis資料庫之主從複製

redis除了基本功能外,還提供了主從複製功能。一個redis服務可以有多個slave服務,而這個slave服務又可以有slave服務。master服務把屬於自己的slave服務用連結串列管理起來,也就是struct redisServer中的slaves成員,slave服務會通過redisServer中的masterhost和masterport來標識它的master服務的ip和port。

redis有兩種方式來標識從屬於哪個master服務:

1、在 配置檔案中配置slaveof masterhost masterport 

2、傳送slaveof命令。

同樣redis也提供了兩種方式來同步主從的資料庫的。

1、通過定時器來完成同步

2、master服務每次執行的命令都會根據情況傳送一份給slave服務。

首先來講述下定時器完成同步的實現:

redis服務的定時功能都是通過serverCron完成,而在serverCron中會呼叫replicationCron,這個函式就完成了不同的功能。

void replicationCron(void) 
{
.......
 if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
.......
}
replicationCron檢查一些超時情況做一些超時的處理,然後會呼叫connectWithMaster去連線master服務。
int connectWithMaster(void) {
     fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
......
      if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
......
}
connectWithMaster首先會向master服務發起連線,然後建立一個讀寫事件並把設定server.repl_state = REDIS_REPL_CONNECTING;
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
......
    if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
        redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
            strerror(errno));
        goto error;
    }
......
 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
......
server.repl_state = REDIS_REPL_TRANSFER;
......
}
syncWithMaster會發送SYNC命令給master服務,然後設定可讀事件的handler,並把slave的狀態設定為傳輸狀態。下面來看下master服務接收到sync命令的處理:
void syncCommand(redisClient *c) {
......
// 檢查是否已經有 BGSAVE 在執行,否則就建立一個新的 BGSAVE 任務
    if (server.rdb_child_pid != -1) {
        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        // 已有 BGSAVE 在執行,檢查它能否用於當前客戶端的 SYNC 操作
        redisClient *slave;
        listNode *ln;
        listIter li;

        // 檢查是否有其他客戶端在等待 SYNC 進行
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
        if (ln) {
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */
            // 找到一個同樣在等到 SYNC 的客戶端
            // 設定當前客戶端的狀態,並複製 buffer 。
            copyClientOutputBuffer(c,slave);
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            // 沒有客戶端在等待 SYNC ,當前客戶端只能等待下次 BGSAVE 進行
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
        // 沒有 BGSAVE 在進行,自己啟動一個。
        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        // 等待 BGSAVE 結束
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    }
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
......
}
這裡並不是真正處理同步的,而是把slave插入到master中slaves連結串列中等待真正同步的操作。那什麼時候才是真正同步的操作呢?請看updateSlavesWaitingBgsave
void updateSlavesWaitingBgsave(int bgsaveerr) {
......
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            // 告訴那些這次不能同步的客戶端,可以等待下次 BGSAVE 了。
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            // 這些是本次可以同步的客戶端

            struct redis_stat buf;

            // 如果 BGSAVE 失敗,釋放 slave 節點
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
            // 開啟 .rdb 檔案
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                // 如果開啟失敗,釋放並清除
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            // 偏移量
            slave->repldboff = 0;
            // 資料庫大小(.rdb 檔案的大小)
            slave->repldbsize = buf.st_size;
            // 狀態
            slave->replstate = REDIS_REPL_SEND_BULK;
            // 清除 slave->fd 的寫事件
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            // 建立一個將 .rdb 檔案內容傳送到附屬節點的寫事件
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }
......
}
這個函式會每個slave建立一個可寫的事件,並從rdb檔案中把資料讀出來,通過sendBulkToSlave傳送給slave。master傳送完後,slave接受資料並進行處理,上面已經看到slave給讀事件設定了handler(readSyncBulkPayload)

以上就是定時器實現主從同步,第二種實現主從同步的情況比較簡單。

每次master接收到客戶端指令都會呼叫call這個函式:

void call(redisClient *c, int flags) {
......
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;

        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
            flags |= REDIS_PROPAGATE_REPL;

        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
......
}
propagate就是實現第二種主從同步。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
從函式程式碼中可以看出,reiplicationFeedSlaves就是真正實現主從同步第二種方式的地方,這個函式也比較簡單,這裡就列舉出來啦。

這裡要提出一個問題:master服務的命令會同步給slave,但是如果slave服務發生變化,master並不會得到同步,這種情況怎麼辦?還是slave只允許讀操作,而不進行寫操作,但是slave服務也可能是別的redis服務的master服務,這樣就感覺不合理了。為什麼要讓slave服務又稱為master服務呢?