Postgresql - 原始碼 - WAL Sender process
介紹:
WAL sender process 是9.0的新功能。需要從主伺服器傳送XLOG到單個recipient(備機)。注意可以同時存在多個walsender程序。當備用伺服器的walreceiver連線到主伺服器並請求XLOG streaming replication的時候,由postmaster啟動walsender process。
walsender類似於常規後端,連線和walsender process是一對一的關係,但它是一組特殊的複製模式命令,而不是處理SQL查詢。START_REPLICATION 命令開始向客戶端傳送WAL。當流傳輸時,walsender保持從磁碟讀取XLOG記錄,並通過COPY 協議將他們傳送到備用伺服器,知道兩端通過退出COPY模式結束複製或直接關閉連線。
SIGTERM是正常終止,它只是walsender在下一個適當的時候關閉連線並正常退出。SIGQUIT是緊急終止,就像其他後端一樣,walsender將簡單的終止並退出。連線關閉和FATAL error不會被看作崩潰,而是近似正常的終止。walsender將快速退出而不再發送XLOG記錄。
如果伺服器關閉,檢查指標(checkpointer)在所有常規後端退出之後向我們傳送 PROCSIG_WALSND_INIT_STOPPING。如果後端空閒或在執行SQL,將導致後端關閉。如果正在進行羅支付至,則所有現有的WAL記錄都經過處理,然後關閉。否則會導致walsender切換到停止狀態。在停止狀態下,walsender將拒絕任何複製命令。一旦所有walsenders被確認停止,檢查指標開始關閉檢查點。當關閉檢查點結束時,postmaster給我們傳送SIGUSR2。指示walsender傳送任何未完成的WAL,包括關閉檢查點記錄,等待它被複制到備機,然後退出。
函式呼叫關係:
PostgresMain ( src/backend/tcop/postgres.c )
--> exec_replication_command ( src/backend/replication/walsender.c )
--> StartReplication 或 StartLogicalReplication ( src/backend/replication/walsender.c )
static void
StartReplication(StartReplicationCmd *cmd)
{
......
/* 我們假設我們在WAR中記錄了足夠的日誌傳輸資訊,因為這是在PostmasterMain()中檢查的。*/
if (cmd->slotname)
{
......
}
/* 選擇時間線。如果它是由客戶端顯式給出的,那麼使用。否則使用上次儲存在ThisTimeLineID中的重放記錄的時間線。*/
if (am_cascading_walsender)
{
/* this also updates ThisTimeLineID */
FlushPtr = GetStandbyFlushRecPtr();
}
else
FlushPtr = GetFlushRecPtr();
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
if (sendTimeLine == ThisTimeLineID)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
}
else
{
List *timeLineHistory;
sendTimeLineIsHistoric = true;
/* 檢查客戶端請求的時間線是否存在,請求的起始位置在該時間線上。 */
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/* 在歷史中找到請求的時間線。檢查請求的起始點是否在我們歷史上的時間線上。
* 這是故意的。我們只檢查在切換點之前沒有fork 好請求的時間線。我們不檢查我們在要求的起始點之前切換。這是因為客戶機可以合法地請求從包含交換點的WAL段的開頭開始複製,但是在新的時間線上,這樣就不會以部分段結束。如果你要求太老的起點,你會得到一個錯誤,當我們找不到請求的WAL段在pg_wal。 */
if (!XLogRecPtrIsInvalid(switchpoint) &&
switchpoint < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
cmd->timeline),
errdetail("This server's history forked from timeline %u at %X/%X.",
cmd->timeline,
(uint32) (switchpoint >> 32),
(uint32) (switchpoint))));
}
sendTimeLineValidUpto = switchpoint;
}
}
else
{
sendTimeLine = ThisTimeLineID;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
streamingDoneSending = streamingDoneReceiving = false;
/* 如果沒有內容需要stream,不要進入複製模式 */
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/* 當我們第一次啟動複製時,備機將跟隨在主伺服器後面。對於一些應用程式,例如同步複製,對於這個初始catchup模式有一個清晰的狀態很重要,因此當我們稍後改變流狀態時可以觸發動作。我們可能會呆在這個狀態很長一段時間,這正是我們想要監視我們是否還在同步的原因。 */
WalSndSetState(WALSNDSTATE_CATCHUP);
/* 傳送 CopyBothResponse 資訊, 並且開始 streaming */
pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
pq_flush();
/* 不允許請求一個在WAL中的未來的點去stream,WAL還沒有被重新整理到磁碟。 */
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
(uint32) (cmd->startpoint >> 32),
(uint32) (cmd->startpoint),
(uint32) (FlushPtr >> 32),
(uint32) (FlushPtr))));
}
/* 從請求點開始streaming */
sentPtr = cmd->startpoint;
/* 初始化共享記憶體狀態 */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* walsender的主迴圈 */
replication_active = true;
WalSndLoop(XLogSendPhysical);
replication_active = false;
if (got_STOPPING)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
Assert(streamingDoneSending && streamingDoneReceiving);
}
if (cmd->slotname)
ReplicationSlotRelease();
/* 複製完成了。傳送指示下一個時間線的單行結果集。 */
if (sendTimeLineIsHistoric)
{
char startpos_str[8 + 1 + 8 + 1];
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Datum values[2];
bool nulls[2];
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
/* 需要一個表示兩個列的元組描述符。int8看起來是一個令人驚訝的資料型別,但是理論上int4不夠寬,因為TimeLineID是無符號的。*/
tupdesc = CreateTemplateTupleDesc(2, false);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
TEXTOID, -1, 0);
/* prepare for projection of tuple */
tstate = begin_tup_output_tupdesc(dest, tupdesc);
values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
values[1] = CStringGetTextDatum(startpos_str);
/* send it to dest */
do_tup_output(tstate, values, nulls);
end_tup_output(tstate);
}
/* 傳送 CommandComplete (完成)訊息 */
pq_puttextmessage('C', "START_STREAMING");
}
******************************************************************************************
下面我們看一下主要的walsndloop程式碼。
這是walsender 的主迴圈。
/* walsender process 的主迴圈,將WAL複製到複製訊息上。*/
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
/* 初始化最後一個答覆時間戳。這樣就可以實現 timeout 處理。 */
last_reply_timestamp = GetCurrentTimestamp();
waiting_for_ping_response = false;
/* 迴圈,直到我們到達這個時間線的末端,或者客戶端請求停止streaming。 */
for (;;)
{
TimestampTz now;
/* 當postmaster程序死掉,將緊急處理。避免對所有postmaster的子程序進行手工處理。 */
if (!PostmasterIsAlive())
exit(1);
/* 清除任何 */
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* 處理最近收到的任何請求或訊號 */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
SyncRepInitConfig();
}
/* 檢查客戶的輸入 */
ProcessRepliesIfAny();
/* 如果我們從客戶機接收到CopyDone,我們自己傳送CopyDone,並且輸出緩衝區是空的,那麼就該退出streaming。 */
if (streamingDoneReceiving && streamingDoneSending &&
!pq_is_send_pending())
break;
/* 如果在輸出緩衝區中沒有任何掛起的資料,嘗試傳送更多。如果有的話,我們不必再呼叫 send_data 資料,直到我們重新整理它…但我們最好假設我們沒有趕上。 */
if (!pq_is_send_pending())
send_data();
else
WalSndCaughtUp = false;
/* 嘗試將未決輸出重新整理到客戶端 */
if (pq_flush_if_writable() != 0)
WalSndShutdown();
/* 如果現在沒有什麼東西需要傳送 ... */
if (WalSndCaughtUp && !pq_is_send_pending())
{
/* 如果我們處於追趕狀態,移動到streaming。對於使用者來說,這是一個需要了解的重要狀態更改,因為在此之前,如果主伺服器宕機,並且需要向備用伺服器進行故障轉移,則可能會發生資料丟失。狀態更改對於同步複製也很重要,因為在該點開始等待的提交可能等待一段時間。 */
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
{
ereport(DEBUG1,
(errmsg("\"%s\" has now caught up with upstream server",
application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
}
/* 當SIGUSR2到達,我們將任何未完成的日誌傳送到關機檢查點記錄(即最新記錄),等待它們複製到待機狀態,然後退出。這可能是一個正常的終止在關機,或推廣,walsender 不確定是哪個。 */
if (got_SIGUSR2)
WalSndDone(send_data);
}
now = GetCurrentTimestamp();
/* 檢查 replication 超時. */
WalSndCheckTimeOut(now);
/* 如果時間到了,傳送keepalive */
WalSndKeepaliveIfNecessary(now);
/* 如果不敢上,不會阻塞,除非有未傳送的資料等待,在這種情況下,我們最好阻塞,直到套接字寫就緒為止。這個測試只適用於 send_data 回撥處理了可用資料的子集,但是 pq_flush_if_writable 重新整理了所有資料的情況——我們應該立即嘗試傳送更多資料。 */
if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
{
long sleeptime;
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
WL_SOCKET_READABLE;
sleeptime = WalSndComputeSleeptime(now);
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep直到某事發生或 timeout */
WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_MAIN);
}
}
return;
}
******************************************************************************************
/* 將WAL以其正常的物理/儲存形式傳送出去。
* 讀取已經重新整理到磁碟但尚未傳送到客戶端的WAL的MAX_SEND_SIZE位元組,並將其緩衝到libpq輸出緩衝區中。
* 如果沒有剩餘的未傳送WAL,WalSndCaughtUp 設定為true,否則 WalSndCaughtUp 設定為false。*/
static void
XLogSendPhysical(void)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
/* 如果請求,將WAL sender 切換到stopping狀態. */
if (got_STOPPING)
WalSndSetState(WALSNDSTATE_STOPPING);
if (streamingDoneSending)
{
WalSndCaughtUp = true;
return;
}
/* Figure out how far we can safely send the WAL. */
if (sendTimeLineIsHistoric)
{
/* 將舊的時間線 streaming 到這個伺服器的歷史中,但不是我們當前插入或重放的那個時間線。它可以 streaming 到我們關掉時間線的那一點。 */
SendRqstPtr = sendTimeLineValidUpto;
}
else if (am_cascading_walsender)
{
/* 在備機中 streaming 最新的時間線。
* 嘗試傳送所有已經重放的WAL,這樣我們就知道它是有效的。如果我們通過流複製接收WAL,傳送任何已接收但未重放的WAL也可以。
* 我們正在恢復的時間線可以改變,或者我們可以被提升。在任何一種情況下,當前的時間線都是歷史性的。我們需要檢測這一點,這樣我們就不會試圖流過我們切換到另一個時間線的那一點。我們在計算 FlushPtr 之後檢查升級或時間線切換,以避免出現競爭條件:如果時間線在我們檢查它仍然是當前之後就變得具有歷史意義,那麼仍然可以把它streaming 到 FlushPtr r上,而FlushPtr是在它變得具有歷史意義之前計算的。 */
bool becameHistoric = false;
SendRqstPtr = GetStandbyFlushRecPtr();
if (!RecoveryInProgress())
{
/* RecoveryInProgress() 更新 ThisTimeLineID 成為當前時間線 */
am_cascading_walsender = false;
becameHistoric = true;
}
else
{
/* 仍然是級聯備機。但我們是否仍在恢復時間線?ThisTimeLineID通過GetStandbyFlushRecPtr() 呼叫被更新 */
if (sendTimeLine != ThisTimeLineID)
becameHistoric = true;
}
if (becameHistoric)
{
/* 我們傳送的時間線已經成為歷史。讀取新的時間線的時間線歷史檔案,以檢視我們從傳送的時間線中準確地分叉的位置。 */
List *history;
history = readTimeLineHistory(ThisTimeLineID);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history);
sendTimeLineIsHistoric = true;
SendRqstPtr = sendTimeLineValidUpto;
}
}
else
{
/* 將當前時間線傳輸到主機上。
* 嘗試傳送所有已經寫好的資料,並將其同步到磁碟。對於當前實現的 XLogRead() ,我們不能再做什麼了。在任何情況下,傳送不安全的WAL到主伺服器上的磁碟是不安全的:如果主伺服器隨後崩潰並重新啟動,備用伺服器一定沒有應用任何在主伺服器上丟失的WAL。 */
SendRqstPtr = GetFlushRecPtr();
}
/* 記錄當前的系統時間作為寫這個WAL位置用於滯後跟蹤的近似時間。
* 理論上,無論何時重新整理WAL,我們都可以讓XLogFlush() 在 shmem 中記錄一個時間,並且當我們呼叫上面的GetFlushRecPtr() 時(同樣對於級聯備機),我們可以獲得該時間以及LSN,但是與將任何新程式碼放入熱WAL路徑相比,它似乎足夠好抓住這裡的時間。我們應該在XLogFlush()執行WalSndWakeupProcessRequ.()之後達到這個目的,儘管這可能需要一些時間,但是我們讀取WAL重新整理指標,並在這裡非常接近地花費時間,以便如果它仍在移動,我們將得到一個稍後的位置。
* 因為LagTrackerWriter 在LSN尚未升級時忽略了示例,因此這為這個LSN提供了WAL重新整理時間的廉價近似值
* 注意,LSN並不一定是包含在本訊息中的資料的LSN;它是WAL的末尾,它可能更進一步。所有滯後跟蹤機器關心的是找出任意的LSN最終何時被報告為寫入、重新整理和應用,以便它可以測量經過的時間。 */
LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
/* 如果這是一個歷史的時間線,我們已經到達了下一個時間線的轉折點,停止streaming。
* 注意:我們可能已經發送了WAL > sendTimeLineValidUpto 。啟動過程通常會在啟動之前重放從主伺服器接收的所有WAL,但是如果WAL streaming 終止於WAL頁的邊界,則時間線的有效部分可能終止於WAL記錄的中間。我們可能已經將部分WAL記錄的前半部分發送到級聯備用,因此sentPtr > sendTimeLineValidUpto。沒關係,級聯待機也不能重放部分WAL記錄,所以它仍然可以遵循我們的時間線開關。*/
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
/* close the current file. */
if (sendFile >= 0)
close(sendFile);
sendFile = -1;
/* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
(uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
(uint32) (sentPtr >> 32), (uint32) sentPtr);
return;
}
/* Do we have any work to do? */
Assert(sentPtr <= SendRqstPtr);
if (SendRqstPtr <= sentPtr)
{
WalSndCaughtUp = true;
return;
}
/* 計算一個訊息傳送多少。如果傳送的位元組不超過MAX_SEND_SIZE 位元組,則傳送所有內容。否則傳送MAX_SEND_SIZE 大小位元組,但返回到日誌檔案或頁邊界。
* Figure out how much to send in one message. If there's no more than
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
* MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
* 舍入不僅僅是出於效能原因。Walreceiver 依賴於我們從不分割WAL記錄兩個訊息的事實。由於長的WAL記錄在頁面邊界被分割成連續記錄,所以頁面邊界始終是安全的截止點。我們還假設 SendRqstPtr 從來沒有指向WAL記錄的中間。 */
startptr = sentPtr;
endptr = startptr;
endptr += MAX_SEND_SIZE;
/* 如果我們超越了 SendRqstPtr, 回退 */
if (SendRqstPtr <= endptr)
{
endptr = SendRqstPtr;
if (sendTimeLineIsHistoric)
WalSndCaughtUp = false;
else
WalSndCaughtUp = true;
}
else
{
/* round down to page boundary. */
endptr -= (endptr % XLOG_BLCKSZ);
WalSndCaughtUp = false;
}
nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/* 可以讀取和傳送的切片。 */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'w');
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
pq_sendint64(&output_message, 0); /* sendtime, filled in last */
/* 將日誌直接讀入輸出緩衝區,以避免額外的 memcpy 呼叫。 */
enlargeStringInfo(&output_message, nbytes);
XLogRead(&output_message.data[output_message.len], startptr, nbytes);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
/* 最後填寫傳送時間戳,以使其儘可能晚。 */
resetStringInfo(&tmpbuf);
pq_sendint64(&tmpbuf, GetCurrentTimestamp());
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
pq_putmessage_noblock('d', output_message.data, output_message.len);
sentPtr = endptr;
/* 更新共享記憶體狀態 */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Report progress of XLOG streaming in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
(uint32) (sentPtr >> 32), (uint32) sentPtr);
set_ps_display(activitymsg, false);
}
return;
}