From ab8839138521af35f6b00f530a39d10ef8ead555 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Fri, 21 Jul 2023 05:34:21 +0000 Subject: [PATCH 1/3] Send shutdown checkpoint record to subscriber --- src/backend/replication/walsender.c | 30 +++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..fc1363ba76 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -187,6 +187,9 @@ static bool WalSndCaughtUp = false; static volatile sig_atomic_t got_SIGUSR2 = false; static volatile sig_atomic_t got_STOPPING = false; +/* Are all the WALs flushed? */ +static bool WalsAreFlushed = false; + /* * This is set while we are streaming. When not set * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, @@ -260,7 +263,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); - /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -1581,7 +1583,10 @@ WalSndWaitForWal(XLogRecPtr loc) * written, because walwriter has shut down already. */ if (got_STOPPING) + { XLogBackgroundFlush(); + WalsAreFlushed = true; + } /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) @@ -3100,12 +3105,20 @@ XLogSendLogical(void) WalSndCaughtUp = true; /* - * If we're caught up and have been requested to stop, have WalSndLoop() - * terminate the connection in an orderly manner, after writing out all - * the pending data. + * If we're caught up, have been requested to stop and there are no pending + * records to be sent, change to stopping mode. */ - if (WalSndCaughtUp && got_STOPPING) - got_SIGUSR2 = true; + if (WalSndCaughtUp && WalsAreFlushed && !pq_is_send_pending()) + { + /* + * Update the stats forcibly. pgstat_shutdown_hook reports any pending + * stats at the end of the process, but it would happen after the + * checkpointer exits so that it would lead assertion failure. We must + * ensure all the stats are recorded before changing the state. + */ + pgstat_report_stat(true); + WalSndSetState(WALSNDSTATE_STOPPING); + } /* Update shared memory status */ { @@ -3142,6 +3155,7 @@ WalSndDone(WalSndSendDataCallback send_data) replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? MyWalSnd->write : MyWalSnd->flush; + if (WalSndCaughtUp && sentPtr == replicatedPtr && !pq_is_send_pending()) { @@ -3152,6 +3166,10 @@ WalSndDone(WalSndSendDataCallback send_data) EndCommand(&qc, DestRemote, false); pq_flush(); + /* Mark the slot as dirty and save it to update the confirmed_flush. */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + proc_exit(0); } if (!waiting_for_ping_response) -- 2.34.1