1. 程式人生 > >Postgresql - 原始碼 - walreceiver process

Postgresql - 原始碼 - walreceiver process

啟動程序時執行一個函式,這個函式是walreceiver 程序的主入口。

WalReceiverMain()

 

程式碼位置:

src/backend/replication/walreceiver.c

 

下面我們看一下主入口函式

 

/* Main entry point for walreceiver process */

void

WalReceiverMain(void)

{

    ......

 

    /* 檢查walrcv,walrcv 應該已經被設定 */

    Assert(walrcv != NULL);

 

    now = GetCurrentTimestamp();

 

    /* 標記walreceiver在記憶體中為running。

     * 這件事應儘早執行,如果之後失敗了,我們將設定狀態為STOPPED。如果在設定之前程序死掉,則啟動程序將保持等待,並執行啟動。 */

    SpinLockAcquire(&walrcv->mutex);

    Assert(walrcv->pid == 0);

    switch (walrcv->walRcvState)

    {

        case WALRCV_STOPPING:

            ......

        case WALRCV_STOPPED:

            ......

        case WALRCV_STARTING:

            ......

        case WALRCV_WAITING:

        case WALRCV_STREAMING:

        case WALRCV_RESTARTING:

        default:

            /* Shouldn't happen */

            SpinLockRelease(&walrcv->mutex);

            elog(PANIC, "walreceiver still running according to shared memory state");

    }

    /* Advertise our PID so that the startup process can kill us */

    walrcv->pid = MyProcPid;

    walrcv->walRcvState = WALRCV_STREAMING;

 

    /* 獲取啟動 stream 所需的資訊 */

    walrcv->ready_to_display = false;

    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);

    strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);

    startpoint = walrcv->receiveStart;

    startpointTLI = walrcv->receiveStartTLI;

 

    /* 初始化 sanish 值 */

    walrcv->lastMsgSendTime =

        walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;

 

    /* 告訴閂鎖來喚醒這個程序 */

    walrcv->latch = &MyProc->procLatch;

 

    SpinLockRelease(&walrcv->mutex);

 

    /* 安排在 walreceiver 退出時清理記憶體 */

    on_shmem_exit(WalRcvDie, 0);

 

    /* 正確接受或忽略postmaster 可能傳送給我們的訊號 */

    pqsignal(SIGHUP, WalRcvSigHupHandler);  /* set flag to read config file */

    pqsignal(SIGINT, SIG_IGN);

    pqsignal(SIGTERM, WalRcvShutdownHandler);   /* request shutdown */

    pqsignal(SIGQUIT, WalRcvQuickDieHandler);   /* hard crash time */

    pqsignal(SIGALRM, SIG_IGN);

    pqsignal(SIGPIPE, SIG_IGN);

    pqsignal(SIGUSR1, WalRcvSigUsr1Handler);

    pqsignal(SIGUSR2, SIG_IGN);

 

    /* 重置被postmaster接收的訊號 */

    pqsignal(SIGCHLD, SIG_DFL);

    pqsignal(SIGTTIN, SIG_DFL);

    pqsignal(SIGTTOU, SIG_DFL);

    pqsignal(SIGCONT, SIG_DFL);

    pqsignal(SIGWINCH, SIG_DFL);

 

    /* We allow SIGQUIT (quickdie) at all times */

    sigdelset(&BlockSig, SIGQUIT);

 

    /* Load the libpq-specific functions */

    load_file("libpqwalreceiver", false);

    if (WalReceiverFunctions == NULL)

        elog(ERROR, "libpqwalreceiver didn't initialize correctly");

 

    /* 建立resource owner 來跟蹤我們的資源 (不清楚我們需要這個,但也可能有一個). */

    CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");

 

    /* Unblock signals (they were blocked when the postmaster forked us) */

    PG_SETMASK(&UnBlockSig);

 

    /* Establish the connection to the primary for XLOG streaming */

    EnableWalRcvImmediateExit();

    wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);

    if (!wrconn)

        ereport(ERROR,

                (errmsg("could not connect to the primary server: %s", err)));

    DisableWalRcvImmediateExit();

 

    /* 儲存使用者可見的連線字串。為了安全起見,這就破壞了原來的連線資訊。還儲存此walreceiver 接收器連線到的傳送器伺服器的主機和埠。*/

    tmp_conninfo = walrcv_get_conninfo(wrconn);

    walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);

    SpinLockAcquire(&walrcv->mutex);

    memset(walrcv->conninfo, 0, MAXCONNINFO);

    if (tmp_conninfo)

        strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);

 

    memset(walrcv->sender_host, 0, NI_MAXHOST);

    if (sender_host)

        strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);

 

    walrcv->sender_port = sender_port;

    walrcv->ready_to_display = true;

    SpinLockRelease(&walrcv->mutex);

 

    if (tmp_conninfo)

        pfree(tmp_conninfo);

 

    if (sender_host)

        pfree(sender_host);

 

    first_stream = true;

    for (;;)

    {

        char     *primary_sysid;

        char        standby_sysid[32];

        int         server_version;

        WalRcvStreamOptions options;

 

        /* 檢查我們使用IDENTIFY_SYSTEM replication命令連線到可用的server */

        EnableWalRcvImmediateExit();

        primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,

                                             &server_version);

 

        snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,

                 GetSystemIdentifier());

        if (strcmp(primary_sysid, standby_sysid) != 0)

        {

            ereport(ERROR,

                    (errmsg("database system identifier differs between the primary and standby"),

                     errdetail("The primary's identifier is %s, the standby's identifier is %s.",

                             primary_sysid, standby_sysid)));

        }

        DisableWalRcvImmediateExit();

 

        /* 確認主時間的當前時間線是相同的或在我們的前面。 */

        if (primaryTLI < startpointTLI)

            ereport(ERROR,

                    (errmsg("highest timeline %u of the primary is behind recovery timeline %u",

                            primaryTLI, startpointTLI)));

 

        /*獲取任何丟失的歷史檔案。我們總是這樣做的,即使我們對這個時間線不感興趣,因此如果我們以後被提升為master,我們就不會選擇與當前 master 中已經使用的時間線相同的時間線。這並不是萬無一失的——如果您需要確保在每種情況下都選擇唯一的時間軸id,那麼將需要一些外部軟體來管理叢集,但是讓我們儘可能避免時間軸id衝突的混淆。 */

        WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);

 

        /* 開始streaming。我們將嘗試從請求的起始點和時間線開始,即使它與伺服器的最新時間線不同。如果我們已經到達舊的時間表的末尾,伺服器將立即完成streaming ,我們將返回等待啟動過程。如果recovery_target_timeline是“最新的”,則啟動程序將掃描pg_wal並找到新的歷史檔案、使用恢復目標時間線,並請求我們在新的時間線上重新啟動。 */

        options.logical = false;

        options.startpoint = startpoint;

        options.slotname = slotname[0] != '\0' ? slotname : NULL;

        options.proto.physical.startpointTLI = startpointTLI;

        ThisTimeLineID = startpointTLI;

        if (walrcv_startstreaming(wrconn, &options))

        {

            if (first_stream)

                ereport(LOG,

                        (errmsg("started streaming WAL from primary at %X/%X on timeline %u",

                                (uint32) (startpoint >> 32), (uint32) startpoint,

                                startpointTLI)));

            else

                ereport(LOG,

                        (errmsg("restarted WAL streaming at %X/%X on timeline %u",

                                (uint32) (startpoint >> 32), (uint32) startpoint,

                                startpointTLI)));

            first_stream = false;

 

            /* 初始化 LogstreamResult 和處理訊息的緩衝區 */

            LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);

            initStringInfo(&reply_message);

            initStringInfo(&incoming_message);

 

            /* Initialize the last recv timestamp */

            last_recv_timestamp = GetCurrentTimestamp();

            ping_sent = false;

 

            /* 迴圈直到streaming結束,或出現錯誤 */

            for (;;)

            {

                char     *buf;

                int         len;

                bool        endofwal = false;

                pgsocket    wait_fd = PGINVALID_SOCKET;

                int         rc;

 

                /* 如果我們沒有恢復,退出walreceiver。這不應該發生,但是交叉檢查這裡的狀態。 */

                if (!RecoveryInProgress())

                    ereport(FATAL,

                            (errmsg("cannot continue WAL streaming, recovery has already ended")));

 

                /* 處理最近收到的任何請求或訊號 */

                ProcessWalRcvInterrupts();

 

                if (got_SIGHUP)

                {

                    got_SIGHUP = false;

                    ProcessConfigFile(PGC_SIGHUP);

                    XLogWalRcvSendHSFeedback(true);

                }

 

                /* See if we can read data immediately */

                len = walrcv_receive(wrconn, &buf, &wait_fd);

                if (len != 0)

                {

                    /* 處理接收到的資料,以及我們可以在不阻塞的情況下讀取的任何後續資料。  */

                    for (;;)

                    {

                        if (len > 0)

                        {

                            /* 從master那裡收到了一些東西,所以重置超時 */

                            last_recv_timestamp = GetCurrentTimestamp();

                            ping_sent = false;

                            XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);

                        }

                        else if (len == 0)

                            break;

                        else if (len < 0)

                        {

                            ereport(LOG,

                                    (errmsg("replication terminated by primary server"),

                                     errdetail("End of WAL reached on timeline %u at %X/%X.",

                                             startpointTLI,

                                             (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));

                            endofwal = true;

                            break;

                        }

                        len = walrcv_receive(wrconn, &buf, &wait_fd);

                    }

 

                    /* Let the master know that we received some data. */

                    XLogWalRcvSendReply(false, false);

 

                    /* 如果我們已經寫了一些記錄,將它們重新整理到磁碟,讓啟動過程和主伺服器知道它們。 */

                    XLogWalRcvFlush(false);

                }

 

                /* Check if we need to exit the streaming loop. */

                if (endofwal)

                    break;

 

                /* 理想情況下,我們將在這裡重複使用 WaitEventSet 物件以避免在epoll系統上 WaitLatchOrSocket 的開銷,但是我們不能確定libpq具有相同的套接字(即使fd是相同的數字,它也許自上次以來已經被關閉並重新開啟)。將來,如果有一個函式用於從 WaitEventSet 中刪除套接字,那麼我們可以每次只新增和刪除套接字,從而潛在地避免一些系統呼叫。 */

                Assert(wait_fd != PGINVALID_SOCKET);

                rc = WaitLatchOrSocket(walrcv->latch,

                                     WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |

                                     WL_TIMEOUT | WL_LATCH_SET,

                                     wait_fd,

                                     NAPTIME_PER_CYCLE,

                                     WAIT_EVENT_WAL_RECEIVER_MAIN);

                if (rc & WL_LATCH_SET)

                {

                    ResetLatch(walrcv->latch);

                    if (walrcv->force_reply)

                    {

                        /* 恢復過程要求我們現在傳送應用反饋。在傳送回覆之前,請確保標記在共享記憶體中設定為false,因此我們不會錯過答覆的新請求。 */

                        walrcv->force_reply = false;

                        pg_memory_barrier();

                        XLogWalRcvSendReply(true, false);

                    }

                }

                if (rc & WL_POSTMASTER_DEATH)

                {

                    /* 如果postmaster 程序死了,將緊急救助。這是為了避免對所有postmaster的子程序進行手工清理。 */

                    exit(1);

                }

                if (rc & WL_TIMEOUT)

                {

                    /* 我們沒有收到任何新東西。如果我們還沒有從伺服器上聽到任何關於wal_receiver_timeout/2的訊息,請ping伺服器。而且,如果從我們上次傳送更新以來它比wal_receiver_status_interval長,那麼無論如何都要向主伺服器傳送狀態更新,以報告應用WAL的任何進展。 */

                    bool        requestReply = false;

 

                    /*

                     * Check if time since last receive from standby has

                     * reached the configured limit.

                     */

                    if (wal_receiver_timeout > 0)

                    {

                        TimestampTz now = GetCurrentTimestamp();

                        TimestampTz timeout;

 

                        timeout =

                            TimestampTzPlusMilliseconds(last_recv_timestamp,

                                                        wal_receiver_timeout);

 

                        if (now >= timeout)

                            ereport(ERROR,

                                    (errmsg("terminating walreceiver due to timeout")));

 

                        /* 如果一半的接收器複製超時,我們沒有收到任何新的訊息。ping伺服器。  */

                        if (!ping_sent)

                        {

                            timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,

                                                                 (wal_receiver_timeout / 2));

                            if (now >= timeout)

                            {

                                requestReply = true;

                                ping_sent = true;

                            }

                        }

                    }

 

                    XLogWalRcvSendReply(requestReply, requestReply);

                    XLogWalRcvSendHSFeedback(false);

                }

            }

 

            /* streaming後臺結束。退出streaming COPY-mode */

            EnableWalRcvImmediateExit();

            walrcv_endstreaming(wrconn, &primaryTLI);

            DisableWalRcvImmediateExit();

 

            /* 如果伺服器切換到一個新的時間線,而我們開始流式傳輸時不知道,那麼現在獲取它的時間線歷史檔案。 */

            WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);

        }

        else

            ereport(LOG,

                    (errmsg("primary server contains no more WAL on requested timeline %u",

                            startpointTLI)));

 

        /* WAL的末尾到達請求的時間線。關閉最後一個片段,等待啟動過程中的新請求。 */

        if (recvFile >= 0)

        {

            char        xlogfname[MAXFNAMELEN];

 

            XLogWalRcvFlush(false);

            if (close(recvFile) != 0)

                ereport(PANIC,

                        (errcode_for_file_access(),

                         errmsg("could not close log segment %s: %m",

                                XLogFileNameP(recvFileTLI, recvSegNo))));

 

            /* 強制建立.done檔案,防止streaming 段被歸檔,檔案丟失。 */

            XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);

            if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)

                XLogArchiveForceDone(xlogfname);

            else

                XLogArchiveNotify(xlogfname);

        }

        recvFile = -1;

 

        elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");

        WalRcvWaitForStartPosition(&startpoint, &startpointTLI);

    }

    /* not reached */

}