Postgresql - 原始碼 - walreceiver process
啟動程序時執行一個函式,這個函式是walreceiver 程序的主入口。
WalReceiverMain()
程式碼位置:
src/backend/replication/walreceiver.c
下面我們看一下主入口函式
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
......
/* 檢查walrcv,walrcv 應該已經被設定 */
Assert(walrcv != NULL);
now = GetCurrentTimestamp();
/* 標記walreceiver在記憶體中為running。
* 這件事應儘早執行,如果之後失敗了,我們將設定狀態為STOPPED。如果在設定之前程序死掉,則啟動程序將保持等待,並執行啟動。 */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->pid == 0);
switch (walrcv->walRcvState)
{
case WALRCV_STOPPING:
......
case WALRCV_STOPPED:
......
case WALRCV_STARTING:
......
case WALRCV_WAITING:
case WALRCV_STREAMING:
case WALRCV_RESTARTING:
default:
/* Shouldn't happen */
SpinLockRelease(&walrcv->mutex);
elog(PANIC, "walreceiver still running according to shared memory state");
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_STREAMING;
/* 獲取啟動 stream 所需的資訊 */
walrcv->ready_to_display = false;
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
startpoint = walrcv->receiveStart;
startpointTLI = walrcv->receiveStartTLI;
/* 初始化 sanish 值 */
walrcv->lastMsgSendTime =
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
/* 告訴閂鎖來喚醒這個程序 */
walrcv->latch = &MyProc->procLatch;
SpinLockRelease(&walrcv->mutex);
/* 安排在 walreceiver 退出時清理記憶體 */
on_shmem_exit(WalRcvDie, 0);
/* 正確接受或忽略postmaster 可能傳送給我們的訊號 */
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
pqsignal(SIGUSR2, SIG_IGN);
/* 重置被postmaster接收的訊號 */
pqsignal(SIGCHLD, SIG_DFL);
pqsignal(SIGTTIN, SIG_DFL);
pqsignal(SIGTTOU, SIG_DFL);
pqsignal(SIGCONT, SIG_DFL);
pqsignal(SIGWINCH, SIG_DFL);
/* We allow SIGQUIT (quickdie) at all times */
sigdelset(&BlockSig, SIGQUIT);
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/* 建立resource owner 來跟蹤我們的資源 (不清楚我們需要這個,但也可能有一個). */
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
if (!wrconn)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s", err)));
DisableWalRcvImmediateExit();
/* 儲存使用者可見的連線字串。為了安全起見,這就破壞了原來的連線資訊。還儲存此walreceiver 接收器連線到的傳送器伺服器的主機和埠。*/
tmp_conninfo = walrcv_get_conninfo(wrconn);
walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
SpinLockAcquire(&walrcv->mutex);
memset(walrcv->conninfo, 0, MAXCONNINFO);
if (tmp_conninfo)
strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
memset(walrcv->sender_host, 0, NI_MAXHOST);
if (sender_host)
strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
walrcv->sender_port = sender_port;
walrcv->ready_to_display = true;
SpinLockRelease(&walrcv->mutex);
if (tmp_conninfo)
pfree(tmp_conninfo);
if (sender_host)
pfree(sender_host);
first_stream = true;
for (;;)
{
char *primary_sysid;
char standby_sysid[32];
int server_version;
WalRcvStreamOptions options;
/* 檢查我們使用IDENTIFY_SYSTEM replication命令連線到可用的server */
EnableWalRcvImmediateExit();
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
&server_version);
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
if (strcmp(primary_sysid, standby_sysid) != 0)
{
ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
DisableWalRcvImmediateExit();
/* 確認主時間的當前時間線是相同的或在我們的前面。 */
if (primaryTLI < startpointTLI)
ereport(ERROR,
(errmsg("highest timeline %u of the primary is behind recovery timeline %u",
primaryTLI, startpointTLI)));
/*獲取任何丟失的歷史檔案。我們總是這樣做的,即使我們對這個時間線不感興趣,因此如果我們以後被提升為master,我們就不會選擇與當前 master 中已經使用的時間線相同的時間線。這並不是萬無一失的——如果您需要確保在每種情況下都選擇唯一的時間軸id,那麼將需要一些外部軟體來管理叢集,但是讓我們儘可能避免時間軸id衝突的混淆。 */
WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
/* 開始streaming。我們將嘗試從請求的起始點和時間線開始,即使它與伺服器的最新時間線不同。如果我們已經到達舊的時間表的末尾,伺服器將立即完成streaming ,我們將返回等待啟動過程。如果recovery_target_timeline是“最新的”,則啟動程序將掃描pg_wal並找到新的歷史檔案、使用恢復目標時間線,並請求我們在新的時間線上重新啟動。 */
options.logical = false;
options.startpoint = startpoint;
options.slotname = slotname[0] != '\0' ? slotname : NULL;
options.proto.physical.startpointTLI = startpointTLI;
ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
(uint32) (startpoint >> 32), (uint32) startpoint,
startpointTLI)));
else
ereport(LOG,
(errmsg("restarted WAL streaming at %X/%X on timeline %u",
(uint32) (startpoint >> 32), (uint32) startpoint,
startpointTLI)));
first_stream = false;
/* 初始化 LogstreamResult 和處理訊息的緩衝區 */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
/* 迴圈直到streaming結束,或出現錯誤 */
for (;;)
{
char *buf;
int len;
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
/* 如果我們沒有恢復,退出walreceiver。這不應該發生,但是交叉檢查這裡的狀態。 */
if (!RecoveryInProgress())
ereport(FATAL,
(errmsg("cannot continue WAL streaming, recovery has already ended")));
/* 處理最近收到的任何請求或訊號 */
ProcessWalRcvInterrupts();
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
XLogWalRcvSendHSFeedback(true);
}
/* See if we can read data immediately */
len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
/* 處理接收到的資料,以及我們可以在不阻塞的情況下讀取的任何後續資料。 */
for (;;)
{
if (len > 0)
{
/* 從master那裡收到了一些東西,所以重置超時 */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
}
else if (len == 0)
break;
else if (len < 0)
{
ereport(LOG,
(errmsg("replication terminated by primary server"),
errdetail("End of WAL reached on timeline %u at %X/%X.",
startpointTLI,
(uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
endofwal = true;
break;
}
len = walrcv_receive(wrconn, &buf, &wait_fd);
}
/* Let the master know that we received some data. */
XLogWalRcvSendReply(false, false);
/* 如果我們已經寫了一些記錄,將它們重新整理到磁碟,讓啟動過程和主伺服器知道它們。 */
XLogWalRcvFlush(false);
}
/* Check if we need to exit the streaming loop. */
if (endofwal)
break;
/* 理想情況下,我們將在這裡重複使用 WaitEventSet 物件以避免在epoll系統上 WaitLatchOrSocket 的開銷,但是我們不能確定libpq具有相同的套接字(即使fd是相同的數字,它也許自上次以來已經被關閉並重新開啟)。將來,如果有一個函式用於從 WaitEventSet 中刪除套接字,那麼我們可以每次只新增和刪除套接字,從而潛在地避免一些系統呼叫。 */
Assert(wait_fd != PGINVALID_SOCKET);
rc = WaitLatchOrSocket(walrcv->latch,
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
NAPTIME_PER_CYCLE,
WAIT_EVENT_WAL_RECEIVER_MAIN);
if (rc & WL_LATCH_SET)
{
ResetLatch(walrcv->latch);
if (walrcv->force_reply)
{
/* 恢復過程要求我們現在傳送應用反饋。在傳送回覆之前,請確保標記在共享記憶體中設定為false,因此我們不會錯過答覆的新請求。 */
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
if (rc & WL_POSTMASTER_DEATH)
{
/* 如果postmaster 程序死了,將緊急救助。這是為了避免對所有postmaster的子程序進行手工清理。 */
exit(1);
}
if (rc & WL_TIMEOUT)
{
/* 我們沒有收到任何新東西。如果我們還沒有從伺服器上聽到任何關於wal_receiver_timeout/2的訊息,請ping伺服器。而且,如果從我們上次傳送更新以來它比wal_receiver_status_interval長,那麼無論如何都要向主伺服器傳送狀態更新,以報告應用WAL的任何進展。 */
bool requestReply = false;
/*
* Check if time since last receive from standby has
* reached the configured limit.
*/
if (wal_receiver_timeout > 0)
{
TimestampTz now = GetCurrentTimestamp();
TimestampTz timeout;
timeout =
TimestampTzPlusMilliseconds(last_recv_timestamp,
wal_receiver_timeout);
if (now >= timeout)
ereport(ERROR,
(errmsg("terminating walreceiver due to timeout")));
/* 如果一半的接收器複製超時,我們沒有收到任何新的訊息。ping伺服器。 */
if (!ping_sent)
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
(wal_receiver_timeout / 2));
if (now >= timeout)
{
requestReply = true;
ping_sent = true;
}
}
}
XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback(false);
}
}
/* streaming後臺結束。退出streaming COPY-mode */
EnableWalRcvImmediateExit();
walrcv_endstreaming(wrconn, &primaryTLI);
DisableWalRcvImmediateExit();
/* 如果伺服器切換到一個新的時間線,而我們開始流式傳輸時不知道,那麼現在獲取它的時間線歷史檔案。 */
WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
}
else
ereport(LOG,
(errmsg("primary server contains no more WAL on requested timeline %u",
startpointTLI)));
/* WAL的末尾到達請求的時間線。關閉最後一個片段,等待啟動過程中的新請求。 */
if (recvFile >= 0)
{
char xlogfname[MAXFNAMELEN];
XLogWalRcvFlush(false);
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
XLogFileNameP(recvFileTLI, recvSegNo))));
/* 強制建立.done檔案,防止streaming 段被歸檔,檔案丟失。 */
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
XLogArchiveNotify(xlogfname);
}
recvFile = -1;
elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
}
/* not reached */
}