From b7abfc772ce1b0ce66a0854241a290cf3beb91eb Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 15 Feb 2022 14:56:33 +0000 Subject: [PATCH v1] Allow async standbys wait for sync replication --- doc/src/sgml/config.sgml | 24 +++ doc/src/sgml/monitoring.sgml | 4 + src/backend/replication/walsender.c | 155 ++++++++++++++++++ src/backend/utils/activity/wait_event.c | 3 + src/backend/utils/misc/guc.c | 9 + src/backend/utils/misc/postgresql.conf.sample | 3 + src/include/replication/walsender.h | 1 + src/include/utils/wait_event.h | 3 +- 8 files changed, 201 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 53b361e7a9..63e8c226cb 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4245,6 +4245,30 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + async_standbys_wait_for_sync_replication (boolean) + + async_standbys_wait_for_sync_replication configuration parameter + + + + + When set, the primary will never let asynchronous standbys fall ahead + of the synchronous standbys. In other words, the asynchronous standbys + will wait until the synchronous replication i.e., after at least the + standbys (either in quorum or priority based) receive the WAL, flush + and acknowledge the primary with the flush LSN. This behvaiour is + particularly important as it avoids extra manual steps required on the + asynchronous standbys which are ahead of the synchronous standbys in + the event of failover. + + + This parameter can only be set in the postgresql.conf + file or on the server command line. + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 62f2a3332b..52d58d79c4 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1170,6 +1170,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + AsyncWalSenderWaitForSyncReplication + Asynchronous standby waiting in WAL sender, before sending WAL, for synchronous standbys to flush the WAL. + ClientRead Waiting to read data from the client. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 655760fee3..79920f0212 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -124,6 +124,8 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL * data message */ bool log_replication_commands = false; +bool async_standbys_wait_for_sync_replication = true; + /* * State for WalSndWakeupRequest */ @@ -257,6 +259,9 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); +static bool AmAsyncStandbyWALSender(void); +static bool ShouldWaitForSyncRepl(void); +static void WaitForSyncRepl(XLogRecPtr sendRqstPtr); /* Initialize walsender process before entering the main command loop */ void @@ -2836,6 +2841,8 @@ XLogSendPhysical(void) return; } + WaitForSyncRepl(SendRqstPtr); + /* * Figure out how much to send in one message. If there's no more than * MAX_SEND_SIZE bytes to send, send everything. Otherwise send @@ -2994,6 +3001,22 @@ XLogSendLogical(void) if (record != NULL) { + /* + * At this point, we do not know whether the current LSN (ReadRecPtr) + * is required by any of the logical decoding output plugins which is + * only known at the plugin level. If we were to decide whether to wait + * or not for the synchronous standbys flush LSN at the plugin level, + * we might have to pass extra information to it which doesn't sound an + * elegant way. + * + * Another way the output plugins can wait there before sending the WAL + * is by reading the flush LSN from the logical replication slots. + * + * Waiting here i.e. before even the logical decoding kicks in, makes + * the code clean. + */ + WaitForSyncRepl(logical_decoding_ctx->reader->ReadRecPtr); + /* * Note the lack of any call to LagTrackerWrite() which is handled by * WalSndUpdateProgress which is called by output plugin through @@ -3816,3 +3839,135 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } + +/* + * Check if the WAL sender is serving an asynchronous standby at the moment. + */ +static bool +AmAsyncStandbyWALSender(void) +{ + int priority; + + SpinLockAcquire(&MyWalSnd->mutex); + priority = MyWalSnd->sync_standby_priority; + SpinLockRelease(&MyWalSnd->mutex); + + Assert(priority >= 0); + + if (priority > 0) + ereport(DEBUG3, + (errmsg("WAL sender is serving a sync standby at the moment with priority %d", + priority))); + else if (priority == 0) + ereport(DEBUG3, + (errmsg("WAL sender is serving an async standby at the moment with priority %d", + priority))); + + return priority == 0 ? true : false; +} + +/* + * Check if the WAL sender should wait for synchronous replication. + */ +static bool +ShouldWaitForSyncRepl(void) +{ + /* GUC is set to off, so no need to wait */ + if (!async_standbys_wait_for_sync_replication) + return false; + + /* Synchronous replication is not requested, so no need to wait */ + if (SyncRepStandbyNames == NULL || + SyncRepStandbyNames[0] == '\0' || + SyncRepConfig == NULL) + return false; + + /* + * WAL sender is serving a sync standby at the moment, so no need to wait. + */ + if (!AmAsyncStandbyWALSender()) + return false; + + /* + * WAL sender is serving an async standby at the moment, so it should wait. + */ + return true; +} + +/* + * Wait until the sendRqstPtr is at least the flush LSN of all synchronous + * standbys. + */ +static void +WaitForSyncRepl(XLogRecPtr sendRqstPtr) +{ + XLogRecPtr flushLSN; + volatile WalSndCtlData *walsndctl = WalSndCtl; + + Assert(walsndctl != NULL); + Assert(!XLogRecPtrIsInvalid(sendRqstPtr)); + + if (!ShouldWaitForSyncRepl()) + return; + + LWLockAcquire(SyncRepLock, LW_SHARED); + flushLSN = walsndctl->lsn[SYNC_REP_WAIT_FLUSH]; + LWLockRelease(SyncRepLock); + + /* + * This WAL sender is allowed to send the WAL as its request LSN is equal + * or behind the sync standbys flush LSN. + */ + if (sendRqstPtr <= flushLSN) + { + ereport(DEBUG3, + (errmsg("async standby WAL sender is allowed to send WAL as its request LSN %X/%X is equal or behind sync standbys flush LSN %X/%X", + LSN_FORMAT_ARGS(sendRqstPtr), LSN_FORMAT_ARGS(flushLSN)), + errhidestmt(true))); + + return; + } + + ereport(DEBUG3, + (errmsg("async standby WAL sender with request LSN %X/%X is waiting as sync standbys are ahead with flush LSN %X/%X", + LSN_FORMAT_ARGS(flushLSN), LSN_FORMAT_ARGS(sendRqstPtr)), + errhidestmt(true))); + + for (;;) + { + /* + * It is enough to wait until the flush LSN of sync standbys as it + * guarantees local durable commit, standby durable commit after both + * server and OS crash, thus no transaction losses. + * + * XXX: we could go further to make this optional with + * synchronous_commit setting. That is, the async standbys can be made + * to wait either for remote write or flush or apply LSN. But waiting + * for remote flush LSN suffices to give maximum protection for us. + */ + LWLockAcquire(SyncRepLock, LW_SHARED); + flushLSN = walsndctl->lsn[SYNC_REP_WAIT_FLUSH]; + LWLockRelease(SyncRepLock); + + if (sendRqstPtr <= flushLSN) + break; + + /* + * Wait for a brief moment i.e. 10 msec before trying again. + * Intentionally chose a shorter wait time as we don't want to wait + * longer durations and be aggressive in reading the flush LSN. + */ + WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, 10L, + WAIT_EVENT_ASYNC_WAL_SENDER_WAIT_FOR_SYNC_REPLICATION); + + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } + + ereport(DEBUG3, + (errmsg("async standby WAL sender with request LSN %X/%X can now send WAL up to sync standbys flush LSN %X/%X", + LSN_FORMAT_ARGS(sendRqstPtr), LSN_FORMAT_ARGS(flushLSN)), + errhidestmt(true))); +} diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 60972c3a75..745c3976a8 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -267,6 +267,9 @@ pgstat_get_wait_client(WaitEventClient w) switch (w) { + case WAIT_EVENT_ASYNC_WAL_SENDER_WAIT_FOR_SYNC_REPLICATION: + event_name = "AsyncWalSenderWaitForSyncReplication"; + break; case WAIT_EVENT_CLIENT_READ: event_name = "ClientRead"; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index e2fe219aa8..a4050ba8d5 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1381,6 +1381,15 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"async_standbys_wait_for_sync_replication", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets whether asynchronous standbys should wait until synchronous standbys receive and flush WAL."), + NULL + }, + &async_standbys_wait_for_sync_replication, + true, + NULL, NULL, NULL + }, { {"debug_assertions", PGC_INTERNAL, PRESET_OPTIONS, gettext_noop("Shows whether the running server has assertion checks enabled."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 56d0bee6d9..f9de74d4e8 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -308,6 +308,9 @@ #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) +#async_standbys_wait_for_sync_replication = on # asynchronous standbys wait + # until synchronous standbys receive, flush WAL and + # confirm primary # - Primary Server - diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index b1892e9e4b..61b113ae48 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -34,6 +34,7 @@ extern bool wake_wal_senders; extern int max_wal_senders; extern int wal_sender_timeout; extern bool log_replication_commands; +extern bool async_standbys_wait_for_sync_replication; extern void InitWalSender(void); extern bool exec_replication_command(const char *query_string); diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 395d325c5f..fbe2b66368 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -60,7 +60,8 @@ typedef enum */ typedef enum { - WAIT_EVENT_CLIENT_READ = PG_WAIT_CLIENT, + WAIT_EVENT_ASYNC_WAL_SENDER_WAIT_FOR_SYNC_REPLICATION = PG_WAIT_CLIENT, + WAIT_EVENT_CLIENT_READ, WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_GSS_OPEN_SERVER, WAIT_EVENT_LIBPQWALRECEIVER_CONNECT, -- 2.25.1