From bfa242f68ebdbab65c3ff196b3285f643b0cb96a Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Sun, 11 Jan 2026 15:57:16 +0800 Subject: [PATCH v4] Add WALRCV_CONNECTING and WALRCV_CONNECTED states to walreceiver Previously, walreceiver set status='streaming' early in startup before receiving any WAL, making it unreliable for health monitoring. Introduce two intermediate states: - WALRCV_CONNECTING: Walreceiver enters CONNECTING on startup - WALRCV_CONNECTED: Walreceiver transitions from CONNECTING to CONNECTED after START_REPLICATION succeeded, awaiting first WAL record The final transition from CONNECTED to STREAMING occurs when startup successfully applies the first record, confirming end-to-end data flow. This allows monitoring tools to distinguish connection establishment from active WAL streaming. --- src/backend/access/transam/xlogrecovery.c | 18 ++++++++++++++ src/backend/replication/walreceiver.c | 20 ++++++++++++--- src/backend/replication/walreceiverfuncs.c | 29 +++++++++++++++++++++- src/include/replication/walreceiver.h | 4 +++ 4 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0b5f871abe7..e56585f3f29 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -250,6 +250,9 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; +/* Guard to update walreceiver state only once per streaming session. */ +static bool walrcv_streaming_set = false; + /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -1842,6 +1845,18 @@ PerformWalRecovery(void) */ ApplyWalRecord(xlogreader, record, &replayTLI); + /* + * If we are reading from the stream and successfully applied + * the first WAL record, transition to STREAMING state. This + * confirms end-to-end data flow: the record was received, + * parsed, and applied without error. + */ + if (!walrcv_streaming_set && readSource == XLOG_FROM_STREAM) + { + if (WalRcvSetStreaming()) + walrcv_streaming_set = true; + } + /* Exit loop if we reached inclusive recovery target */ if (recoveryStopsAfter(xlogreader)) { @@ -3702,6 +3717,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * one can hope... */ + /* Reset our "streaming active" guard flag */ + walrcv_streaming_set = false; + /* * We should be able to move to XLOG_FROM_STREAM only in * standby mode. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a41453530a1..b97dafa4a44 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -206,6 +206,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) break; case WALRCV_WAITING: + case WALRCV_CONNECTING: + case WALRCV_CONNECTED: case WALRCV_STREAMING: case WALRCV_RESTARTING: default: @@ -215,7 +217,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -395,6 +397,11 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) LSN_FORMAT_ARGS(startpoint), startpointTLI)); first_stream = false; + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTING) + walrcv->walRcvState = WALRCV_CONNECTED; + SpinLockRelease(&walrcv->mutex); + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); @@ -650,7 +657,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; - if (state != WALRCV_STREAMING) + if (state != WALRCV_STREAMING && state != WALRCV_CONNECTED && + state != WALRCV_CONNECTING) { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) @@ -689,7 +697,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); break; } @@ -792,6 +800,8 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_CONNECTING || + walrcv->walRcvState == WALRCV_CONNECTED || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -1391,6 +1401,10 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; + case WALRCV_CONNECTED: + return "connected"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index da8794cba7c..c9b6ed6e874 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,12 +179,37 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || - state == WALRCV_RESTARTING) + state == WALRCV_RESTARTING || state == WALRCV_CONNECTING || + state == WALRCV_CONNECTED) return true; else return false; } +/* + * Transition from CONNECTED to STREAMING state. + * + * This is called by the startup process after the first WAL record from + * the walreceiver is successfully applied, confirming end-to-end data + * flow: the record was received, parsed, and applied without error. + */ +bool +WalRcvSetStreaming(void) +{ + WalRcvData *walrcv = WalRcv; + bool set = false; + + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTED) + { + walrcv->walRcvState = WALRCV_STREAMING; + set = true; + } + SpinLockRelease(&walrcv->mutex); + + return set; +} + /* * Stop walreceiver (if running) and wait for it to die. * Executed by the Startup process. @@ -211,6 +236,8 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: + case WALRCV_CONNECTED: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f3ad00fb6f3..70159ceb032 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,9 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connection starting, but not established yet */ + WALRCV_CONNECTED, /* replication connection established, but no WAL + * streamed yet */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */ @@ -492,6 +495,7 @@ extern void WalRcvForceReply(void); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); +extern bool WalRcvSetStreaming(void); extern void ShutdownWalRcv(void); extern bool WalRcvStreaming(void); extern bool WalRcvRunning(void); -- 2.51.0