1. 程式人生 > >Postgresql - 原始碼 - WAL Sender process

Postgresql - 原始碼 - WAL Sender process

介紹:

WAL sender process 是9.0的新功能。需要從主伺服器傳送XLOG到單個recipient(備機)。注意可以同時存在多個walsender程序。當備用伺服器的walreceiver連線到主伺服器並請求XLOG streaming replication的時候,由postmaster啟動walsender process。

walsender類似於常規後端,連線和walsender process是一對一的關係,但它是一組特殊的複製模式命令,而不是處理SQL查詢。START_REPLICATION 命令開始向客戶端傳送WAL。當流傳輸時,walsender保持從磁碟讀取XLOG記錄,並通過COPY 協議將他們傳送到備用伺服器,知道兩端通過退出COPY模式結束複製或直接關閉連線。

SIGTERM是正常終止,它只是walsender在下一個適當的時候關閉連線並正常退出。SIGQUIT是緊急終止,就像其他後端一樣,walsender將簡單的終止並退出。連線關閉和FATAL error不會被看作崩潰,而是近似正常的終止。walsender將快速退出而不再發送XLOG記錄。

如果伺服器關閉,檢查指標(checkpointer)在所有常規後端退出之後向我們傳送 PROCSIG_WALSND_INIT_STOPPING。如果後端空閒或在執行SQL,將導致後端關閉。如果正在進行羅支付至,則所有現有的WAL記錄都經過處理,然後關閉。否則會導致walsender切換到停止狀態。在停止狀態下,walsender將拒絕任何複製命令。一旦所有walsenders被確認停止,檢查指標開始關閉檢查點。當關閉檢查點結束時,postmaster給我們傳送SIGUSR2。指示walsender傳送任何未完成的WAL,包括關閉檢查點記錄,等待它被複制到備機,然後退出。

 

函式呼叫關係:

PostgresMain ( src/backend/tcop/postgres.c )

--> exec_replication_command ( src/backend/replication/walsender.c )

--> StartReplication 或 StartLogicalReplication ( src/backend/replication/walsender.c )

 

 

static void

StartReplication(StartReplicationCmd *cmd)

{

    ......

    /* 我們假設我們在WAR中記錄了足夠的日誌傳輸資訊,因為這是在PostmasterMain()中檢查的。*/

 

    if (cmd->slotname)

    {

        ......

    }

 

    /* 選擇時間線。如果它是由客戶端顯式給出的,那麼使用。否則使用上次儲存在ThisTimeLineID中的重放記錄的時間線。*/

    if (am_cascading_walsender)

    {

        /* this also updates ThisTimeLineID */

        FlushPtr = GetStandbyFlushRecPtr();

    }

    else

        FlushPtr = GetFlushRecPtr();

 

    if (cmd->timeline != 0)

    {

        XLogRecPtr  switchpoint;

 

        sendTimeLine = cmd->timeline;

        if (sendTimeLine == ThisTimeLineID)

        {

            sendTimeLineIsHistoric = false;

            sendTimeLineValidUpto = InvalidXLogRecPtr;

        }

        else

        {

            List     *timeLineHistory;

 

            sendTimeLineIsHistoric = true;

 

            /* 檢查客戶端請求的時間線是否存在,請求的起始位置在該時間線上。 */

            timeLineHistory = readTimeLineHistory(ThisTimeLineID);

            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,

                                         &sendTimeLineNextTLI);

            list_free_deep(timeLineHistory);

 

            /* 在歷史中找到請求的時間線。檢查請求的起始點是否在我們歷史上的時間線上。

* 這是故意的。我們只檢查在切換點之前沒有fork 好請求的時間線。我們不檢查我們在要求的起始點之前切換。這是因為客戶機可以合法地請求從包含交換點的WAL段的開頭開始複製,但是在新的時間線上,這樣就不會以部分段結束。如果你要求太老的起點,你會得到一個錯誤,當我們找不到請求的WAL段在pg_wal。 */

            if (!XLogRecPtrIsInvalid(switchpoint) &&

                switchpoint < cmd->startpoint)

            {

                ereport(ERROR,

                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",

                                (uint32) (cmd->startpoint >> 32),

                                (uint32) (cmd->startpoint),

                                cmd->timeline),

                         errdetail("This server's history forked from timeline %u at %X/%X.",

                                 cmd->timeline,

                                 (uint32) (switchpoint >> 32),

                                 (uint32) (switchpoint))));

            }

            sendTimeLineValidUpto = switchpoint;

        }

    }

    else

    {

        sendTimeLine = ThisTimeLineID;

        sendTimeLineValidUpto = InvalidXLogRecPtr;

        sendTimeLineIsHistoric = false;

    }

 

    streamingDoneSending = streamingDoneReceiving = false;

 

    /* 如果沒有內容需要stream,不要進入複製模式 */

    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)

    {

        /* 當我們第一次啟動複製時,備機將跟隨在主伺服器後面。對於一些應用程式,例如同步複製,對於這個初始catchup模式有一個清晰的狀態很重要,因此當我們稍後改變流狀態時可以觸發動作。我們可能會呆在這個狀態很長一段時間,這正是我們想要監視我們是否還在同步的原因。 */

        WalSndSetState(WALSNDSTATE_CATCHUP);

 

        /* 傳送 CopyBothResponse 資訊, 並且開始 streaming */

        pq_beginmessage(&buf, 'W');

        pq_sendbyte(&buf, 0);

        pq_sendint16(&buf, 0);

        pq_endmessage(&buf);

        pq_flush();

 

        /* 不允許請求一個在WAL中的未來的點去stream,WAL還沒有被重新整理到磁碟。 */

        if (FlushPtr < cmd->startpoint)

        {

            ereport(ERROR,

                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",

                            (uint32) (cmd->startpoint >> 32),

                            (uint32) (cmd->startpoint),

                            (uint32) (FlushPtr >> 32),

                            (uint32) (FlushPtr))));

        }

 

        /* 從請求點開始streaming */

        sentPtr = cmd->startpoint;

 

        /* 初始化共享記憶體狀態 */

        SpinLockAcquire(&MyWalSnd->mutex);

        MyWalSnd->sentPtr = sentPtr;

        SpinLockRelease(&MyWalSnd->mutex);

 

        SyncRepInitConfig();

 

        /* walsender的主迴圈 */

        replication_active = true;

 

        WalSndLoop(XLogSendPhysical);

 

        replication_active = false;

        if (got_STOPPING)

            proc_exit(0);

        WalSndSetState(WALSNDSTATE_STARTUP);

 

        Assert(streamingDoneSending && streamingDoneReceiving);

    }

 

    if (cmd->slotname)

        ReplicationSlotRelease();

 

    /* 複製完成了。傳送指示下一個時間線的單行結果集。 */

    if (sendTimeLineIsHistoric)

    {

        char        startpos_str[8 + 1 + 8 + 1];

        DestReceiver *dest;

        TupOutputState *tstate;

        TupleDesc   tupdesc;

        Datum       values[2];

        bool        nulls[2];

 

        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",

                 (uint32) (sendTimeLineValidUpto >> 32),

                 (uint32) sendTimeLineValidUpto);

 

        dest = CreateDestReceiver(DestRemoteSimple);

        MemSet(nulls, false, sizeof(nulls));

 

        /* 需要一個表示兩個列的元組描述符。int8看起來是一個令人驚訝的資料型別,但是理論上int4不夠寬,因為TimeLineID是無符號的。*/

        tupdesc = CreateTemplateTupleDesc(2, false);

        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",

                                 INT8OID, -1, 0);

        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",

                                 TEXTOID, -1, 0);

 

        /* prepare for projection of tuple */

        tstate = begin_tup_output_tupdesc(dest, tupdesc);

 

        values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);

        values[1] = CStringGetTextDatum(startpos_str);

 

        /* send it to dest */

        do_tup_output(tstate, values, nulls);

 

        end_tup_output(tstate);

    }

 

    /* 傳送 CommandComplete (完成)訊息 */

    pq_puttextmessage('C', "START_STREAMING");

}

 

******************************************************************************************

下面我們看一下主要的walsndloop程式碼。

這是walsender 的主迴圈。

 

/* walsender process 的主迴圈,將WAL複製到複製訊息上。*/

static void

WalSndLoop(WalSndSendDataCallback send_data)

{

    /* 初始化最後一個答覆時間戳。這樣就可以實現 timeout 處理。 */

    last_reply_timestamp = GetCurrentTimestamp();

    waiting_for_ping_response = false;

 

    /* 迴圈,直到我們到達這個時間線的末端,或者客戶端請求停止streaming。 */

    for (;;)

    {

        TimestampTz now;

 

        /* 當postmaster程序死掉,將緊急處理。避免對所有postmaster的子程序進行手工處理。 */

        if (!PostmasterIsAlive())

            exit(1);

 

        /* 清除任何 */

        ResetLatch(MyLatch);

 

        CHECK_FOR_INTERRUPTS();

 

        /* 處理最近收到的任何請求或訊號 */

        if (ConfigReloadPending)

        {

            ConfigReloadPending = false;

            ProcessConfigFile(PGC_SIGHUP);

            SyncRepInitConfig();

        }

 

        /* 檢查客戶的輸入 */

        ProcessRepliesIfAny();

 

        /* 如果我們從客戶機接收到CopyDone,我們自己傳送CopyDone,並且輸出緩衝區是空的,那麼就該退出streaming。 */

        if (streamingDoneReceiving && streamingDoneSending &&

            !pq_is_send_pending())

            break;

 

        /* 如果在輸出緩衝區中沒有任何掛起的資料,嘗試傳送更多。如果有的話,我們不必再呼叫 send_data 資料,直到我們重新整理它…但我們最好假設我們沒有趕上。 */

        if (!pq_is_send_pending())

            send_data();

        else

            WalSndCaughtUp = false;

 

        /* 嘗試將未決輸出重新整理到客戶端 */

        if (pq_flush_if_writable() != 0)

            WalSndShutdown();

 

        /* 如果現在沒有什麼東西需要傳送 ... */

        if (WalSndCaughtUp && !pq_is_send_pending())

        {

            /* 如果我們處於追趕狀態,移動到streaming。對於使用者來說,這是一個需要了解的重要狀態更改,因為在此之前,如果主伺服器宕機,並且需要向備用伺服器進行故障轉移,則可能會發生資料丟失。狀態更改對於同步複製也很重要,因為在該點開始等待的提交可能等待一段時間。 */

            if (MyWalSnd->state == WALSNDSTATE_CATCHUP)

            {

                ereport(DEBUG1,

                        (errmsg("\"%s\" has now caught up with upstream server",

                                application_name)));

                WalSndSetState(WALSNDSTATE_STREAMING);

            }

 

            /* 當SIGUSR2到達,我們將任何未完成的日誌傳送到關機檢查點記錄(即最新記錄),等待它們複製到待機狀態,然後退出。這可能是一個正常的終止在關機,或推廣,walsender 不確定是哪個。 */

            if (got_SIGUSR2)

                WalSndDone(send_data);

        }

 

        now = GetCurrentTimestamp();

 

        /* 檢查 replication 超時. */

        WalSndCheckTimeOut(now);

 

        /* 如果時間到了,傳送keepalive */

        WalSndKeepaliveIfNecessary(now);

 

        /* 如果不敢上,不會阻塞,除非有未傳送的資料等待,在這種情況下,我們最好阻塞,直到套接字寫就緒為止。這個測試只適用於 send_data 回撥處理了可用資料的子集,但是 pq_flush_if_writable 重新整理了所有資料的情況——我們應該立即嘗試傳送更多資料。 */

        if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())

        {

            long        sleeptime;

            int         wakeEvents;

 

            wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |

                WL_SOCKET_READABLE;

 

            sleeptime = WalSndComputeSleeptime(now);

 

            if (pq_is_send_pending())

                wakeEvents |= WL_SOCKET_WRITEABLE;

 

            /* Sleep直到某事發生或 timeout */

            WaitLatchOrSocket(MyLatch, wakeEvents,

                             MyProcPort->sock, sleeptime,

                             WAIT_EVENT_WAL_SENDER_MAIN);

        }

    }

    return;

}

 

******************************************************************************************

 

/* 將WAL以其正常的物理/儲存形式傳送出去。

* 讀取已經重新整理到磁碟但尚未傳送到客戶端的WAL的MAX_SEND_SIZE位元組,並將其緩衝到libpq輸出緩衝區中。

* 如果沒有剩餘的未傳送WAL,WalSndCaughtUp 設定為true,否則 WalSndCaughtUp 設定為false。*/

static void

XLogSendPhysical(void)

{

    XLogRecPtr  SendRqstPtr;

    XLogRecPtr  startptr;

    XLogRecPtr  endptr;

    Size        nbytes;

 

    /* 如果請求,將WAL sender 切換到stopping狀態. */

    if (got_STOPPING)

        WalSndSetState(WALSNDSTATE_STOPPING);

 

    if (streamingDoneSending)

    {

        WalSndCaughtUp = true;

        return;

    }

 

    /* Figure out how far we can safely send the WAL. */

    if (sendTimeLineIsHistoric)

    {

        /* 將舊的時間線 streaming 到這個伺服器的歷史中,但不是我們當前插入或重放的那個時間線。它可以 streaming 到我們關掉時間線的那一點。 */

        SendRqstPtr = sendTimeLineValidUpto;

    }

    else if (am_cascading_walsender)

    {

        /* 在備機中 streaming 最新的時間線。

         * 嘗試傳送所有已經重放的WAL,這樣我們就知道它是有效的。如果我們通過流複製接收WAL,傳送任何已接收但未重放的WAL也可以。

         * 我們正在恢復的時間線可以改變,或者我們可以被提升。在任何一種情況下,當前的時間線都是歷史性的。我們需要檢測這一點,這樣我們就不會試圖流過我們切換到另一個時間線的那一點。我們在計算 FlushPtr 之後檢查升級或時間線切換,以避免出現競爭條件:如果時間線在我們檢查它仍然是當前之後就變得具有歷史意義,那麼仍然可以把它streaming 到 FlushPtr r上,而FlushPtr是在它變得具有歷史意義之前計算的。 */

        bool        becameHistoric = false;

 

        SendRqstPtr = GetStandbyFlushRecPtr();

 

        if (!RecoveryInProgress())

        {

            /* RecoveryInProgress() 更新 ThisTimeLineID 成為當前時間線 */

            am_cascading_walsender = false;

            becameHistoric = true;

        }

        else

        {

            /* 仍然是級聯備機。但我們是否仍在恢復時間線?ThisTimeLineID通過GetStandbyFlushRecPtr() 呼叫被更新 */

            if (sendTimeLine != ThisTimeLineID)

                becameHistoric = true;

        }

 

        if (becameHistoric)

        {

            /* 我們傳送的時間線已經成為歷史。讀取新的時間線的時間線歷史檔案,以檢視我們從傳送的時間線中準確地分叉的位置。 */

            List     *history;

 

            history = readTimeLineHistory(ThisTimeLineID);

            sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);

 

            Assert(sendTimeLine < sendTimeLineNextTLI);

            list_free_deep(history);

 

            sendTimeLineIsHistoric = true;

 

            SendRqstPtr = sendTimeLineValidUpto;

        }

    }

    else

    {

        /* 將當前時間線傳輸到主機上。

         * 嘗試傳送所有已經寫好的資料,並將其同步到磁碟。對於當前實現的 XLogRead() ,我們不能再做什麼了。在任何情況下,傳送不安全的WAL到主伺服器上的磁碟是不安全的:如果主伺服器隨後崩潰並重新啟動,備用伺服器一定沒有應用任何在主伺服器上丟失的WAL。 */

        SendRqstPtr = GetFlushRecPtr();

    }

 

    /* 記錄當前的系統時間作為寫這個WAL位置用於滯後跟蹤的近似時間。

     * 理論上,無論何時重新整理WAL,我們都可以讓XLogFlush() 在 shmem 中記錄一個時間,並且當我們呼叫上面的GetFlushRecPtr() 時(同樣對於級聯備機),我們可以獲得該時間以及LSN,但是與將任何新程式碼放入熱WAL路徑相比,它似乎足夠好抓住這裡的時間。我們應該在XLogFlush()執行WalSndWakeupProcessRequ.()之後達到這個目的,儘管這可能需要一些時間,但是我們讀取WAL重新整理指標,並在這裡非常接近地花費時間,以便如果它仍在移動,我們將得到一個稍後的位置。

     * 因為LagTrackerWriter 在LSN尚未升級時忽略了示例,因此這為這個LSN提供了WAL重新整理時間的廉價近似值

     * 注意,LSN並不一定是包含在本訊息中的資料的LSN;它是WAL的末尾,它可能更進一步。所有滯後跟蹤機器關心的是找出任意的LSN最終何時被報告為寫入、重新整理和應用,以便它可以測量經過的時間。 */

    LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());

 

    /* 如果這是一個歷史的時間線,我們已經到達了下一個時間線的轉折點,停止streaming。

     * 注意:我們可能已經發送了WAL > sendTimeLineValidUpto 。啟動過程通常會在啟動之前重放從主伺服器接收的所有WAL,但是如果WAL streaming 終止於WAL頁的邊界,則時間線的有效部分可能終止於WAL記錄的中間。我們可能已經將部分WAL記錄的前半部分發送到級聯備用,因此sentPtr > sendTimeLineValidUpto。沒關係,級聯待機也不能重放部分WAL記錄,所以它仍然可以遵循我們的時間線開關。*/

    if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)

    {

        /* close the current file. */

        if (sendFile >= 0)

            close(sendFile);

        sendFile = -1;

 

        /* Send CopyDone */

        pq_putmessage_noblock('c', NULL, 0);

        streamingDoneSending = true;

 

        WalSndCaughtUp = true;

 

        elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",

             (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,

             (uint32) (sentPtr >> 32), (uint32) sentPtr);

        return;

    }

 

    /* Do we have any work to do? */

    Assert(sentPtr <= SendRqstPtr);

    if (SendRqstPtr <= sentPtr)

    {

        WalSndCaughtUp = true;

        return;

    }

 

    /* 計算一個訊息傳送多少。如果傳送的位元組不超過MAX_SEND_SIZE 位元組,則傳送所有內容。否則傳送MAX_SEND_SIZE 大小位元組,但返回到日誌檔案或頁邊界。

     * Figure out how much to send in one message. If there's no more than

     * MAX_SEND_SIZE bytes to send, send everything. Otherwise send

     * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.

     * 舍入不僅僅是出於效能原因。Walreceiver 依賴於我們從不分割WAL記錄兩個訊息的事實。由於長的WAL記錄在頁面邊界被分割成連續記錄,所以頁面邊界始終是安全的截止點。我們還假設 SendRqstPtr 從來沒有指向WAL記錄的中間。 */

    startptr = sentPtr;

    endptr = startptr;

    endptr += MAX_SEND_SIZE;

 

    /* 如果我們超越了 SendRqstPtr, 回退 */

    if (SendRqstPtr <= endptr)

    {

        endptr = SendRqstPtr;

        if (sendTimeLineIsHistoric)

            WalSndCaughtUp = false;

        else

            WalSndCaughtUp = true;

    }

    else

    {

        /* round down to page boundary. */

        endptr -= (endptr % XLOG_BLCKSZ);

        WalSndCaughtUp = false;

    }

 

    nbytes = endptr - startptr;

    Assert(nbytes <= MAX_SEND_SIZE);

 

    /* 可以讀取和傳送的切片。 */

    resetStringInfo(&output_message);

    pq_sendbyte(&output_message, 'w');

 

    pq_sendint64(&output_message, startptr);    /* dataStart */

    pq_sendint64(&output_message, SendRqstPtr); /* walEnd */

    pq_sendint64(&output_message, 0);   /* sendtime, filled in last */

 

    /* 將日誌直接讀入輸出緩衝區,以避免額外的 memcpy 呼叫。 */

    enlargeStringInfo(&output_message, nbytes);

    XLogRead(&output_message.data[output_message.len], startptr, nbytes);

    output_message.len += nbytes;

    output_message.data[output_message.len] = '\0';

 

    /* 最後填寫傳送時間戳,以使其儘可能晚。 */

    resetStringInfo(&tmpbuf);

    pq_sendint64(&tmpbuf, GetCurrentTimestamp());

    memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],

         tmpbuf.data, sizeof(int64));

 

    pq_putmessage_noblock('d', output_message.data, output_message.len);

 

    sentPtr = endptr;

 

    /* 更新共享記憶體狀態 */

    {

        WalSnd   *walsnd = MyWalSnd;

 

        SpinLockAcquire(&walsnd->mutex);

        walsnd->sentPtr = sentPtr;

        SpinLockRelease(&walsnd->mutex);

    }

 

    /* Report progress of XLOG streaming in PS display */

    if (update_process_title)

    {

        char        activitymsg[50];

 

        snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",

                 (uint32) (sentPtr >> 32), (uint32) sentPtr);

        set_ps_display(activitymsg, false);

    }

 

    return;

}