diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a09ceb2..b21b4c0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2091,7 +2091,7 @@ include_dir 'conf.d' Specifies whether transaction commit will wait for WAL records to be written to disk before the command returns a success indication to the client. Valid values are on, - remote_write, local, and off. + remote_write, remote_apply, local, and off. The default, and safe, setting is on. When off, there can be a delay between when success is reported to the client and when the transaction is @@ -2125,6 +2125,10 @@ include_dir 'conf.d' ensure data preservation even if the standby instance of PostgreSQL were to crash, but not if the standby suffers an operating-system-level crash. + When set to remote_apply, commits will wait until a reply + from the current synchronous stanbyindicates it has received the + commit record of the transaction and applied it, so that it has become + visible to queries. When synchronous diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 6cb690c..2600fba 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1081,6 +1081,9 @@ primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless wal_receiver_status_interval is set to zero on the standby. + In the case that synchronous_commit is set to + remote_apply, the standby sends reply messages when the commit + record is replayed, making the transaction visible. If the standby is the first matching standby, as specified in synchronous_standby_names on the primary, the reply messages from that standby will be used to wake users waiting for @@ -1107,6 +1110,14 @@ primary_slot_name = 'node_a_slot' + Setting synchronous_commit to remote_apply will + cause each commit to wait until the current synchronous standby reports + that it has replayed the transaction, making it visible to user queries. + In simple cases, this allows for load balancing with causal consistency + on a single hot standby. + + + Users will stop waiting if a fast shutdown is requested. However, as when using asynchronous replication, the server will not fully shutdown until all outstanding WAL records are transferred to the currently @@ -1160,9 +1171,10 @@ primary_slot_name = 'node_a_slot' Planning for High Availability - Commits made when synchronous_commit is set to on - or remote_write will wait until the synchronous standby responds. The response - may never occur if the last, or only, standby should crash. + Commits made when synchronous_commit is set to on, + remote_write or remote_apply will wait until the + synchronous standby responds. The response may never occur if the last, or + only, standby should crash. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b491735..5e8cc3d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5121,6 +5121,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; /* + * Check if the caller would like to ask standbys for immediate feedback + * once this commit is applied. + */ + if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) + xl_xinfo.xinfo |= XACT_COMPLETION_SYNC_APPLY_FEEDBACK; + + /* * Relcache invalidations requires information about the current database * and so does logical decoding. */ @@ -5456,6 +5463,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); + /* + * If asked by the primary (because someone is waiting for a synchronous + * commit = remote_apply), we will need to ask walreceiver to send a + * reply immediately. + */ + if (XactCompletionSyncApplyFeedback(parsed->xinfo)) + XLogRequestWalReceiverReply(); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 00f139a..0519357 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -346,6 +346,12 @@ static XLogRecPtr RedoRecPtr; static bool doPageWrites; /* + * doRequestWalReceiverReply is used by recovery code to ask the main recovery + * loop to trigger a walreceiver reply. + */ +static bool doRequestWalReceiverReply; + +/* * RedoStartLSN points to the checkpoint's REDO location which is specified * in a backup label file, backup history file or control file. In standby * mode, XLOG streaming usually starts from the position where an invalid @@ -6909,6 +6915,19 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * If rm_redo reported that it applied a commit record that + * the master is waiting for by calling + * XLogRequestWalReceiverReply, then we wake up the receiver + * so that it notices the updated lastReplayedEndRecPtr and + * sends a reply to the master. + */ + if (doRequestWalReceiverReply) + { + doRequestWalReceiverReply = false; + WalRcvWakeup(); + } + /* Remember this record as the last-applied one */ LastRec = ReadRecPtr; @@ -11630,3 +11649,13 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Called by redo code to indicate that the xlog replay loop should wake up + * the walreceiver process so that a reply can be sent to the primary. + */ +void +XLogRequestWalReceiverReply(void) +{ + doRequestWalReceiverReply = true; +} diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 92faf4e..4565348 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -416,6 +416,7 @@ SyncRepReleaseWaiters(void) WalSnd *syncWalSnd; int numwrite = 0; int numflush = 0; + int numapply = 0; /* * If this WALSender is serving a standby that is not on the list of @@ -462,12 +463,18 @@ SyncRepReleaseWaiters(void) walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply) + { + walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply; + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush, + numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply); /* * If we are managing the highest priority standby, though we weren't @@ -728,6 +735,9 @@ assign_synchronous_commit(int newval, void *extra) case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; break; + case SYNCHRONOUS_COMMIT_REMOTE_APPLY: + SyncRepWaitMode = SYNC_REP_WAIT_APPLY; + break; default: SyncRepWaitMode = SYNC_REP_NO_WAIT; break; diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7b36e02..c19842e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -101,6 +101,7 @@ static uint32 recvOff = 0; */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGUSR2 = false; /* * LogstreamResult indicates the byte positions that we have already @@ -150,9 +151,27 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); static void WalRcvSigUsr1Handler(SIGNAL_ARGS); +static void WalRcvSigUsr2Handler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); +static void WalRcvBlockSigUsr2(void) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_BLOCK, &mask, NULL); +} + +static void WalRcvUnblockSigUsr2(void) +{ + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_UNBLOCK, &mask, NULL); +} static void ProcessWalRcvInterrupts(void) @@ -200,6 +219,7 @@ WalReceiverMain(void) WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; bool ping_sent; + bool forceReply; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -268,7 +288,7 @@ WalReceiverMain(void) pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalRcvSigUsr1Handler); - pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGUSR2, WalRcvSigUsr2Handler); /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); @@ -299,6 +319,10 @@ WalReceiverMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* Block SIGUSR2 (we unblock it only during network waits). */ + WalRcvBlockSigUsr2(); + got_SIGUSR2 = false; + /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo); @@ -408,7 +432,9 @@ WalReceiverMain(void) } /* Wait a while for data to arrive */ + WalRcvUnblockSigUsr2(); len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); + WalRcvBlockSigUsr2(); if (len != 0) { /* @@ -439,11 +465,21 @@ WalReceiverMain(void) endofwal = true; break; } + WalRcvUnblockSigUsr2(); len = walrcv_receive(0, &buf); + WalRcvBlockSigUsr2(); + } + + if (got_SIGUSR2) + { + /* The recovery process asked us to force a reply. */ + got_SIGUSR2 = false; + forceReply = true; } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(forceReply, false); + forceReply = false; /* * If we've written some records, flush them to disk and @@ -498,7 +534,14 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + if (got_SIGUSR2) + { + /* The recovery process asked us to force a reply. */ + got_SIGUSR2 = false; + forceReply = true; + } + XLogWalRcvSendReply(requestReply || forceReply, requestReply); + forceReply = false; XLogWalRcvSendHSFeedback(false); } } @@ -740,6 +783,13 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) errno = save_errno; } +/* SIGUSR2: used to receive wakeups from recovery */ +static void +WalRcvSigUsr2Handler(SIGNAL_ARGS) +{ + got_SIGUSR2 = true; +} + /* SIGTERM: set flag for main loop, or shutdown immediately if safe */ static void WalRcvShutdownHandler(SIGNAL_ARGS) @@ -1222,6 +1272,22 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } /* + * Wake up the walreceiver if it happens to be blocked in walrcv_receive, + * and tell it that a commit record has been applied. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the master which may be waiting in a COMMIT with + * synchronous_commit = remote_apply. + */ +void +WalRcvWakeup(void) +{ + if (WalRcv->pid != 0) + kill(WalRcv->pid, SIGUSR2); +} + +/* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ea5a09a..a8eaa5f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = { }; /* - * Although only "on", "off", "remote_write", and "local" are documented, we - * accept all the likely variants of "on" and "off". + * Although only "on", "off", "remote_apply", "remote_write", and "local" are + * documented, we accept all the likely variants of "on" and "off". */ static const struct config_enum_entry synchronous_commit_options[] = { {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, + {"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false}, {"on", SYNCHRONOUS_COMMIT_ON, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false}, {"true", SYNCHRONOUS_COMMIT_ON, true}, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ee3d378..085099c 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -177,7 +177,7 @@ # (change requires restart) #fsync = on # turns forced synchronization on or off #synchronous_commit = on # synchronization level; - # off, local, remote_write, or on + # off, local, remote_write, remote_apply, or on #wal_sync_method = fsync # the default is the first option # supported by the operating system: # open_datasync diff --git a/src/include/access/xact.h b/src/include/access/xact.h index ebeb582..ed8d22c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -60,7 +60,9 @@ typedef enum SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote * write */ - SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */ + SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote + * apply */ } SyncCommitLevel; /* Define the default setting for synchonous_commit */ @@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, * EOXact... routines which run at the end of the original transaction * completion. */ +#define XACT_COMPLETION_SYNC_APPLY_FEEDBACK (1U << 29) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) /* Access macros for above flags */ +#define XactCompletionSyncApplyFeedback(xinfo) \ + (!!(xinfo & XACT_COMPLETION_SYNC_APPLY_FEEDBACK)) #define XactCompletionRelcacheInitFileInval(xinfo) \ (!!(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)) #define XactCompletionForceSyncCommit(xinfo) \ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index ecd30ce..68e20e4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -268,6 +268,8 @@ extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogRequestWalReceiverReply(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 96e059b..28b68f6 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -23,8 +23,9 @@ #define SYNC_REP_NO_WAIT -1 #define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 3 /* syncRepState */ #define SYNC_REP_NOT_WAITING 0 diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 6eacb09..3294df9 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -162,5 +162,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); +extern void WalRcvWakeup(void); #endif /* _WALRECEIVER_H */