1. 程式人生 > >Postgresql - 原始碼 - checkpointer process

Postgresql - 原始碼 - checkpointer process

程式碼位置:

src/backend/postmaster/checkpointer.c

 

程式碼中的註釋:

檢查指標是Postgres 9.2 的新功能。它處理所有checkpoint。checkpoint在從上一個checkpoint經過一定時間之後自動排程,並且還可以用訊號通知它執行所請求的checkpoint。(每隔這麼多WAL段指定一個checkpoint的GUC引數通過在它們填充WAL段時具有後端訊號來實現;checkpoint本身不監視環境。)

一旦啟動子程序結束,postmaster 就啟動檢查指標,或者如果我們正在進行歸檔恢復,或是隻要恢復開始,檢查指標就啟動。它仍然活著,postmaster 命令它終止。正常終止是 SIGUSR2,它指示檢查指標執行關機checkpoint,然後exit(0) 。(所有的後端必須在 SIGUSR2 釋出之前停止!)SIGQUIT 是緊急終止;與任何後端一樣,檢查指標將簡單地中止並退出SIGQUIT。

如果檢查指標意外地退出,則 postmaster 將此視為後端崩潰:共享記憶體可能損壞,因此剩餘的後端應該被SIGQUIT殺死,然後開始恢復迴圈。(即使共享記憶體沒有損壞,我們也丟失了關於在下一個checkpoint 需要對哪些檔案進行fsync的資訊,因此需要強制重新啟動系統。)

 

用於檢查指標和後端之間通訊的共享記憶體區域

CKPT計數器允許後端監視他們傳送的checkpoint請求的完成。這就是它的運作方式:

  • 在checkpoint開始時,檢查指標讀取(清除)請求標誌並增加 ckpt_started,同時保持 ckpt_lck。
  • 在checkpoint完成後,檢查指標將 ckpt_done 設定為等於 ckpt_started。
  • 在checkpoint失敗時,檢查指標遞增ckpt_failed ,並將 ckpt_done 設定為等於 ckpt_started。

 

後端的演算法是:

1. 在保持ckpt_lck的時候,記錄 ckpt_failed 和ckpt_started 的值,並設定請求標誌。

2. 傳送訊號請求checkpoint。

3. sleep直到 ckpt_started 變化。現在,知道自啟動此演算法以來checkpoint已經開始(儘管*不*它是由您的訊號專門發起的),並且它正在使用標誌。

4. 記錄ckpt_started 的新值。

5. sleep 直到ckpt_done >= 被儲存的 ckpt_started 。(這裡使用模演算法,以防出現計數器。)現在知道checkpoint已經啟動和完成,但不知道它是否成功。

6. 如果 ckpt_failed 與最初儲存的值不同,則假定請求失敗,否則它肯定是成功的。

 

ckpt_flags 儲存自上一個checkpoint開始以來所有請求後端傳送的checkpoint請求標誌的OR。選擇了標誌,以便 OR 是組合多個請求的正確方式。

 

num_backend_writes 用於計算由使用者後端程序執行的緩衝區寫入數。這個計數器應該足夠寬,在一個處理週期內不能溢位。 num_backend_fsync 那些也必須執行它們自己的fsync的寫入的子集進行計數,因為檢查指標無法吸收它們的請求。

 

請求陣列儲存後端傳送的 fsync 請求,而未被檢查指標吸收。

 

與checkpoint欄位不同, num_backend_writes,num_backend_fsync 和請求欄位由 CheckpointerCommLock 保護。

 

下面看一下checkpointer程序的主入口

/* checkpointer程序的主入口,這是從 AuxiliaryProcessMain 呼叫的,它已經建立了基本的執行環境,但還沒有啟用訊號。 */

void

CheckpointerMain(void)

{

    sigjmp_buf  local_sigjmp_buf;

    MemoryContext checkpointer_context;

 

    CheckpointerShmem->checkpointer_pid = MyProcPid;

 

    /* 正確地接受或忽略 postmaster 可能傳送給我們的訊號。注意:我們故意忽略SIGTERM, 因為在標準的Unix系統關閉週期中,init將同時對所有程序進行SIGTERM 。我們希望等待後端退出,因此postmaster 會告訴我們關閉(通過 SIGUSR2 )是可以的。 */

    pqsignal(SIGHUP, ChkptSigHupHandler);   /* set flag to read config file */

    pqsignal(SIGINT, ReqCheckpointHandler); /* request checkpoint */

    pqsignal(SIGTERM, SIG_IGN); /* ignore SIGTERM */

    pqsignal(SIGQUIT, chkpt_quickdie);  /* hard crash time */

    pqsignal(SIGALRM, SIG_IGN);

    pqsignal(SIGPIPE, SIG_IGN);

    pqsignal(SIGUSR1, chkpt_sigusr1_handler);

    pqsignal(SIGUSR2, ReqShutdownHandler);  /* request shutdown */

 

    /* 重置一些由 postmaster 接受但不在這裡接受的訊號 */

    pqsignal(SIGCHLD, SIG_DFL);

    pqsignal(SIGTTIN, SIG_DFL);

    pqsignal(SIGTTOU, SIG_DFL);

    pqsignal(SIGCONT, SIG_DFL);

    pqsignal(SIGWINCH, SIG_DFL);

 

    /* 我們允許在任何時候 SIGQUIT (quickdie) */

    sigdelset(&BlockSig, SIGQUIT);

 

    /* 初始化以使第一次驅動事件發生在正確的時間 */

    last_checkpoint_time = last_xlog_switch_time = (pg_time_t) time(NULL);

 

    /* 建立一個resource owner 來跟蹤我們的資源(目前僅是buffer pins) */

    CurrentResourceOwner = ResourceOwnerCreate(NULL, "Checkpointer");

 

    /* 建立一個記憶體上下文,我們將完成所有的工作。我們這樣做,以便在錯誤恢復過程中可以重置上下文,從而避免可能的記憶體洩漏。以前,這段程式碼只是在TopMemoryContext 執行,但是重新設定這將是一個非常糟糕的想法。 */

    checkpointer_context = AllocSetContextCreate(TopMemoryContext,

                                                 "Checkpointer",

                                                 ALLOCSET_DEFAULT_SIZES);

    MemoryContextSwitchTo(checkpointer_context);

 

    /* 如果遇到異常,則在此恢復處理。 */

    if (sigsetjmp(local_sigjmp_buf, 1) != 0)

    {

        /* 由於不使用 PG_TRY,必須手動重置錯誤堆疊。 */

        error_context_stack = NULL;

 

        /* 清理時防止中斷 */

        HOLD_INTERRUPTS();

 

        /* 向伺服器日誌報告錯誤 */

        EmitErrorReport();

 

        /* 這些操作實際上只是 AbortTransaction() 的最小子集。我們在檢查指標中沒有太多的資源需要考慮,但是我們確實有LWLocks、buffers 和臨時檔案。 */

        LWLockReleaseAll();

        ConditionVariableCancelSleep();

        pgstat_report_wait_end();

        AbortBufferIO();

        UnlockBuffers();

        /* buffer pins 在這裡被釋放 */

        ResourceOwnerRelease(CurrentResourceOwner,

                             RESOURCE_RELEASE_BEFORE_LOCKS,

                             false, true);

        /* 我們不必擔心其他資源所有者的釋出階段 */

        AtEOXact_Buffers(false);

        AtEOXact_SMgr();

        AtEOXact_Files(false);

        AtEOXact_HashTables(false);

 

        /* 警告任何等待checkpoint失敗的後端。 */

        if (ckpt_active)

        {

            SpinLockAcquire(&CheckpointerShmem->ckpt_lck);

            CheckpointerShmem->ckpt_failed++;

            CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;

            SpinLockRelease(&CheckpointerShmem->ckpt_lck);

 

            ckpt_active = false;

        }

 

        /* 現在返回正常的頂級上下文,下次清除錯誤上下文。 */

        MemoryContextSwitchTo(checkpointer_context);

        FlushErrorState();

 

        /* 在頂層上下文中清除任何洩漏的資料 */

        MemoryContextResetAndDeleteChildren(checkpointer_context);

 

        /* 現在我們可以再次中斷 */

        RESUME_INTERRUPTS();

 

        /* 在發生任何錯誤之後,sleep至少1秒。一個寫錯誤很可能會被重複,並且我們不想以儘可能快的速度填充錯誤日誌。 */

        pg_usleep(1000000L);

 

        /* 在發生任何錯誤之後關閉所有開啟的檔案。這在Windows上是有用的,儲存刪除的檔案會導致各種奇怪的錯誤。目前還不清楚我們在別處是否需要,但不應該傷害。 */

        smgrcloseall();

    }

 

    /* We can now handle ereport(ERROR) */

    PG_exception_stack = &local_sigjmp_buf;

 

    /* 解除封鎖訊號(當 postmaster forked 時,他們被封鎖) */

    PG_SETMASK(&UnBlockSig);

 

    /* 確保配置的所有共享記憶體值都正確設定。這樣做確保了其他併發更新程式中沒有競爭條件。 */

    UpdateSharedMemoryConfig();

 

    /* 通知我們, 鎖住喚醒正在休眠的後端。 */

    ProcGlobal->checkpointerLatch = &MyProc->procLatch;

 

    /*

     * Loop forever

     */

    for (;;)

    {

        bool        do_checkpoint = false;

        int         flags = 0;

        pg_time_t   now;

        int         elapsed_secs;

        int         cur_timeout;

        int         rc;

 

        /* 清除任何已掛起的喚醒 */

        ResetLatch(MyLatch);

 

        /* 處理最近收到的任何請求或訊號。 */

        AbsorbFsyncRequests();

 

        if (got_SIGHUP)

        {

            got_SIGHUP = false;

            ProcessConfigFile(PGC_SIGHUP);

 

            /* 檢查指標是最後一個要關閉的程序,因此我們要求它儲存一系列其他任務所需的金鑰,這些任務中的大多數與checkpoint沒有任何關係。由於各種原因,一些配置值可以動態更改,因此它們的主要副本儲存在共享記憶體中,以確保所有後端看到相同的值。我們讓檢查指標負責更新共享記憶體拷貝,如果引數設定因嘆息而改變。 */

            UpdateSharedMemoryConfig();

        }

        if (checkpoint_requested)

        {

            checkpoint_requested = false;

            do_checkpoint = true;

            BgWriterStats.m_requested_checkpoints++;

        }

        if (shutdown_requested)

        {

            /* 從這裡開始,elog(ERROR) 應該以exit(1) 結束,而不是將控制返回到上面的 sigsetjmp 塊。 */

            ExitOnAnyError = true;

            /* 關閉資料庫 */

            ShutdownXLOG(0, 0);

            /* 從檢查指標中正常退出 */

            proc_exit(0);       /* done */

        }

 

        /* 如果最後一個時間過多,則強制checkpoint 。注意,只有在沒有外部請求的情況下才對計時checkpoint進行統計,但是即使存在外部請求,我們也設定 CAUSE_TIME 標誌位。 */

        now = (pg_time_t) time(NULL);

        elapsed_secs = now - last_checkpoint_time;

        if (elapsed_secs >= CheckPointTimeout)

        {

            if (!do_checkpoint)

                BgWriterStats.m_timed_checkpoints++;

            do_checkpoint = true;

            flags |= CHECKPOINT_CAUSE_TIME;

        }

 

        /* 如果需要,做一次checkpoint */

        if (do_checkpoint)

        {

            bool        ckpt_performed = false;

            bool        do_restartpoint;

 

            /*

             * Check if we should perform a checkpoint or a restartpoint. As a

             * side-effect, RecoveryInProgress() initializes TimeLineID if

             * it's not set yet.

             */

            do_restartpoint = RecoveryInProgress();

 

            /*

             * Atomically fetch the request flags to figure out what kind of a

             * checkpoint we should perform, and increase the started-counter

             * to acknowledge that we've started a new checkpoint.

             */

            SpinLockAcquire(&CheckpointerShmem->ckpt_lck);

            flags |= CheckpointerShmem->ckpt_flags;

            CheckpointerShmem->ckpt_flags = 0;

            CheckpointerShmem->ckpt_started++;

            SpinLockRelease(&CheckpointerShmem->ckpt_lck);

 

            /* 恢復checkpoint的結束是一個真正的checkpoint,我們還在恢復中執行。 */

            if (flags & CHECKPOINT_END_OF_RECOVERY)

                do_restartpoint = false;

 

            /* 如果(a)自上次checkpoint以來過早(無論什麼原因),我們將警告(b)有人自上次checkpoint 開始以來設定了CHECKPOINT_CAUSE_XLOG 標誌。特別注意,此實現不會生成警告是因為CheckPointTimeout < CheckPointWarning。 */

            if (!do_restartpoint &&

                (flags & CHECKPOINT_CAUSE_XLOG) &&

                elapsed_secs < CheckPointWarning)

                ereport(LOG,

                        (errmsg_plural("checkpoints are occurring too frequently (%d second apart)",

                                     "checkpoints are occurring too frequently (%d seconds apart)",

                                     elapsed_secs,

                                     elapsed_secs),

                         errhint("Consider increasing the configuration parameter \"max_wal_size\".")));

 

            /* 初始化checkpoint 中使用的檢查指標私有變數。 */

            ckpt_active = true;

            if (do_restartpoint)

                ckpt_start_recptr = GetXLogReplayRecPtr(NULL);

            else

                ckpt_start_recptr = GetInsertRecPtr();

            ckpt_start_time = now;

            ckpt_cached_elapsed = 0;

 

            /* 做一次 checkpoint */

            if (!do_restartpoint)

            {

                CreateCheckPoint(flags);

                ckpt_performed = true;

            }

            else

                ckpt_performed = CreateRestartPoint(flags);

 

            /* 在任何checkpoint之後,關閉所有 smgr 檔案。因此,我們不會無限期地掛起對刪除檔案的引用。 */

            smgrcloseall();

 

            /* 任何等待的後端指示checkpoint完成。 */

            SpinLockAcquire(&CheckpointerShmem->ckpt_lck);

            CheckpointerShmem->ckpt_done = CheckpointerShmem->ckpt_started;

            SpinLockRelease(&CheckpointerShmem->ckpt_lck);

 

            if (ckpt_performed)

            {

                /* 注意,我們將checkpoint開始時間不結束的時間記錄為last_checkpoint_time . 這使得time-driven 的checkpoint以可預測的間隔發生。 */

                last_checkpoint_time = now;

            }

            else

            {

                /* 我們無法重新執行啟動點(checkpoint 在錯誤的情況下丟擲錯誤)。最可能的是,自從上次重新啟動點以來,我們還沒有收到任何新的checkpoint WAL記錄。15秒再試一次。 */

                last_checkpoint_time = now - CheckPointTimeout + 15;

            }

 

            ckpt_active = false;

        }

 

        /* 如果需要,檢查archive_timeout 並切換 xlog 檔案。Check for archive_timeout and switch xlog files if necessary. */

        CheckArchiveTimeout();

 

        /* 向統計收集器傳送活動統計資訊。(我們重新使用bgwriter-related 程式碼的原因是bgwriter 和checkpointer 只不過是一個程序。將統計支援分為兩個獨立的統計資訊型別可能是不值得的。  */

        pgstat_send_bgwriter();

 

        /* sleep 直到我們發出訊號,或者是另一個checkpoint 或xlog 檔案切換。 */

        now = (pg_time_t) time(NULL);

        elapsed_secs = now - last_checkpoint_time;

        if (elapsed_secs >= CheckPointTimeout)

            continue;           /* no sleep for us ... */

        cur_timeout = CheckPointTimeout - elapsed_secs;

        if (XLogArchiveTimeout > 0 && !RecoveryInProgress())

        {

            elapsed_secs = now - last_xlog_switch_time;

            if (elapsed_secs >= XLogArchiveTimeout)

                continue;       /* no sleep for us ... */

            cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);

        }

 

        rc = WaitLatch(MyLatch,

                     WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,

                     cur_timeout * 1000L /* convert to ms */ ,

                     WAIT_EVENT_CHECKPOINTER_MAIN);

 

        /* 如果postmaster 程序死掉,將緊急救助。這是為了避免對所有postmaster 的子程序 進行手工清理。 */

        if (rc & WL_POSTMASTER_DEATH)

            exit(1);

    }

}