diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index cb95aa3..a5bfb9a 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -723,6 +723,8 @@ WriteTruncateXlogRec(int pageno) rdata.next = NULL; recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE, &rdata); XLogFlush(recptr); + + /* XXX Do we need wait for the failback safe standby ? */ } /* diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 5a8f654..cb0ddea 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -700,6 +700,12 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata) START_CRIT_SECTION(); XLogFlush(max_lsn); END_CRIT_SECTION(); + + /* + * Also wait for the failback safe standby to receive WAL upto + * max_lsn. + */ + SyncRepWaitForLSN(max_lsn, true, true); } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e975f8d..f7f95d7 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1096,7 +1096,7 @@ EndPrepare(GlobalTransaction gxact) * Note that at this stage we have marked the prepare, but still show as * running in the procarray (twice!) and continue to hold locks. */ - SyncRepWaitForLSN(gxact->prepare_lsn); + SyncRepWaitForLSN(gxact->prepare_lsn, false, true); records.tail = records.head = NULL; } @@ -2063,7 +2063,7 @@ RecordTransactionCommitPrepared(TransactionId xid, * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, false, true); } /* @@ -2143,5 +2143,5 @@ RecordTransactionAbortPrepared(TransactionId xid, * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, false, true); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 31e868d..359d9c9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1195,7 +1195,7 @@ RecordTransactionCommit(void) * in the procarray and continue to hold locks. */ if (wrote_xlog) - SyncRepWaitForLSN(XactLastRecEnd); + SyncRepWaitForLSN(XactLastRecEnd, false, true); /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 40b780c..69dda0d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7061,6 +7061,13 @@ CreateCheckPoint(int flags) XLogFlush(recptr); /* + * At this point, ensure that the failback safe standby has received the + * checkpoint WAL. Otherwise failure after the control file update will + * cause the master to start from a location not known to the standby + */ + SyncRepWaitForLSN(recptr, true, true); + + /* * We mustn't write any new WAL after a shutdown checkpoint, or it will be * overwritten at next startup. No-one should even try, this just allows * sanity-checking. In the case of an end-of-recovery checkpoint, we want diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index 971a149..2eea92d 100644 --- a/src/backend/catalog/storage.c +++ b/src/backend/catalog/storage.c @@ -288,6 +288,13 @@ RelationTruncate(Relation rel, BlockNumber nblocks) */ if (fsm || vm) XLogFlush(lsn); + + /* + * Also ensure that the WAL is received by the failback safe standby. + * Otherwise, we may have a situation where the heap is truncated, but + * the action never replayed on the standby + */ + SyncRepWaitForLSN(lsn, true, true); } /* Do the real work */ diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 5424281..727d107 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -59,17 +59,27 @@ /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; +/* User-settable parameter for failback safe standby */ +char *FailbackSafeStandbyName; + #define SyncStandbysDefined() \ (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') +#define FailbackSafeStandbyDefined() \ + (FailbackSafeStandbyName != NULL && FailbackSafeStandbyName[0] != '\0') + static bool announce_next_takeover = true; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; +static int FailbackSafeRepWaitMode = SYNC_REP_NO_WAIT; +int failback_safety = FAILBACK_SAFETY_OFF; + static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepGetStandbyPriority(void); +static bool SyncRepCheckIfFailbackSafe(void); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); @@ -82,28 +92,52 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); */ /* - * Wait for synchronous replication, if requested by user. + * Wait for synchronous/failback_safe replication, if requested by user. * * Initially backends start in state SYNC_REP_NOT_WAITING and then - * change that state to SYNC_REP_WAITING before adding ourselves - * to the wait queue. During SyncRepWakeQueue() a WALSender changes - * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. - * This backend then resets its state to SYNC_REP_NOT_WAITING. + * change that state to SYNC_REP_WAITING/SYNC_REP_WAITING_FOR_FAILBACK_SAFETY + * before adding ourselves to the wait queue. During SyncRepWakeQueue() a + * WALSender changes the state to SYNC_REP_WAIT_COMPLETE once replication is + * confirmed. This backend then resets its state to SYNC_REP_NOT_WAITING. + * + * ForFailbackSafety - if TRUE, we wait for the failback safe standby. + * Otherwise wait for the sync standby + * + * Wait - if FALSE, we don't actually wait, but tell the caller whether or not + * the standby has already made progressed upto the given XactCommitLSN + * + * Return TRUE if either the sync/failback_safe standby is not + * configured/turned off OR the standby has made enough progress */ -void -SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +bool +SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool ForFailbackSafety, bool Wait) { char *new_status = NULL; const char *old_status; - int mode = SyncRepWaitMode; + int mode = !ForFailbackSafety ? SyncRepWaitMode : FailbackSafeRepWaitMode; + bool ret; /* * Fast exit if user has not requested sync replication, or there are no * sync replication standby names defined. Note that those standbys don't * need to be connected. */ - if (!SyncRepRequested() || !SyncStandbysDefined()) - return; + if ((!SyncRepRequested() || !SyncStandbysDefined()) && + !ForFailbackSafety) + return true; + + /* + * If the caller has specified ForFailbackSafety, but failback_safe_standby + * is not specified or its turned off, exit. + * + * We would like to allow the failback safe mechanism even for cascaded + * standbys as well. But we can't really wait for the standby to catch + * up until we reach a consistent state since the standbys won't be + * even able to connect without us reaching in that state (XXX Confirm) + */ + if ((!FailbackSafeRepRequested() || !FailbackSafeStandbyDefined() || + !reachedConsistency) && ForFailbackSafety) + return true; Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(WalSndCtl != NULL); @@ -119,19 +153,28 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * condition but we'll be fetching that cache line anyway so its likely to * be a low cost check. */ - if (!WalSndCtl->sync_standbys_defined || + if ((!ForFailbackSafety && !WalSndCtl->sync_standbys_defined) || + (ForFailbackSafety && !WalSndCtl->failback_safe_standby_defined) || XactCommitLSN <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); - return; + return true; } /* + * Exit if we are told not to block on the standby. + */ + if (!Wait) + return false; + + /* * Set our waitLSN so WALSender will know when to wake us, and add * ourselves to the queue. */ MyProc->waitLSN = XactCommitLSN; - MyProc->syncRepState = SYNC_REP_WAITING; + MyProc->syncRepState = !ForFailbackSafety ? + SYNC_REP_WAITING : + SYNC_REP_WAITING_FOR_FAILBACK_SAFETY; SyncRepQueueInsert(mode); Assert(SyncRepQueueIsOrderedByLSN(mode)); LWLockRelease(SyncRepLock); @@ -150,6 +193,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) new_status[len] = '\0'; /* truncate off " waiting ..." */ } + ret = false; /* * Wait for specified LSN to be confirmed. * @@ -179,14 +223,18 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * contained memory barriers. */ syncRepState = MyProc->syncRepState; - if (syncRepState == SYNC_REP_WAITING) + if (syncRepState == SYNC_REP_WAITING || + syncRepState == SYNC_REP_WAITING_FOR_FAILBACK_SAFETY) { LWLockAcquire(SyncRepLock, LW_SHARED); syncRepState = MyProc->syncRepState; LWLockRelease(SyncRepLock); } if (syncRepState == SYNC_REP_WAIT_COMPLETE) + { + ret = true; break; + } /* * If a wait for synchronous replication is pending, we can neither @@ -263,6 +311,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) set_ps_display(new_status, false); pfree(new_status); } + + return ret; } /* @@ -339,7 +389,7 @@ void SyncRepInitConfig(void) { int priority; - + bool is_failback_safe; /* * Determine if we are a potential sync standby and remember the result * for handling replies from standby. @@ -354,6 +404,18 @@ SyncRepInitConfig(void) (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); } + + is_failback_safe = SyncRepCheckIfFailbackSafe(); + if (MyWalSnd->is_failback_safe != is_failback_safe) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + MyWalSnd->is_failback_safe = is_failback_safe; + LWLockRelease(SyncRepLock); + ereport(DEBUG1, + (errmsg("standby \"%s\" is a failback safe standby", + application_name))); + + } } /* @@ -368,8 +430,11 @@ SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; volatile WalSnd *syncWalSnd = NULL; + volatile WalSnd *failbackSafeWalSnd = NULL; int numwrite = 0; int numflush = 0; + int numwrite_fbs = 0; + int numflush_fbs = 0; int priority = 0; int i; @@ -379,7 +444,7 @@ SyncRepReleaseWaiters(void) * up, still running base backup or the current flush position is still * invalid, then leave quickly also. */ - if (MyWalSnd->sync_standby_priority == 0 || + if ((MyWalSnd->sync_standby_priority == 0 && !MyWalSnd->is_failback_safe) || MyWalSnd->state < WALSNDSTATE_STREAMING || XLogRecPtrIsInvalid(MyWalSnd->flush)) return; @@ -398,58 +463,90 @@ SyncRepReleaseWaiters(void) volatile WalSnd *walsnd = &walsndctl->walsnds[i]; if (walsnd->pid != 0 && - walsnd->state == WALSNDSTATE_STREAMING && - walsnd->sync_standby_priority > 0 && - (priority == 0 || - priority > walsnd->sync_standby_priority) && - !XLogRecPtrIsInvalid(walsnd->flush)) + walsnd->state == WALSNDSTATE_STREAMING) { - priority = walsnd->sync_standby_priority; - syncWalSnd = walsnd; + if (walsnd->sync_standby_priority > 0 && + (priority == 0 || + priority > walsnd->sync_standby_priority) && + !XLogRecPtrIsInvalid(walsnd->flush)) + { + priority = walsnd->sync_standby_priority; + syncWalSnd = walsnd; + } + + if (walsnd->is_failback_safe) + failbackSafeWalSnd = walsnd; } } /* * We should have found ourselves at least. */ - Assert(syncWalSnd); + Assert(syncWalSnd || failbackSafeWalSnd); /* * If we aren't managing the highest priority standby then just leave. */ if (syncWalSnd != MyWalSnd) { - LWLockRelease(SyncRepLock); announce_next_takeover = true; - return; + if (!failbackSafeWalSnd) + { + LWLockRelease(SyncRepLock); + return; + } } - /* * Set the lsn first so that when we wake backends they will release up to * this location. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write && + syncWalSnd == MyWalSnd) { walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush && + syncWalSnd == MyWalSnd) { walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } + if (walsndctl->lsn[SYNC_REP_WAIT_FAILBACK_SAFE_WRITE] < MyWalSnd->write && + failbackSafeWalSnd == MyWalSnd) + { + walsndctl->lsn[SYNC_REP_WAIT_FAILBACK_SAFE_WRITE] = MyWalSnd->write; + numwrite_fbs = SyncRepWakeQueue(false, SYNC_REP_WAIT_FAILBACK_SAFE_WRITE); + } + if (walsndctl->lsn[SYNC_REP_WAIT_FAILBACK_SAFE_FLUSH] < MyWalSnd->flush && + failbackSafeWalSnd == MyWalSnd) + { + walsndctl->lsn[SYNC_REP_WAIT_FAILBACK_SAFE_FLUSH] = MyWalSnd->flush; + numflush_fbs = SyncRepWakeQueue(false, SYNC_REP_WAIT_FAILBACK_SAFE_FLUSH); + } LWLockRelease(SyncRepLock); - elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", - numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + if (syncWalSnd == MyWalSnd) + { + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", + numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + } + + if (failbackSafeWalSnd == MyWalSnd) + { + elog(DEBUG3, "released %d procs up to write for failback safety %X/%X," + " %d procs up to flush for failback safety %X/%X", + numwrite_fbs, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, + numflush_fbs, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + } /* * If we are managing the highest priority standby, though we weren't * prior to this, then announce we are now the sync standby. */ - if (announce_next_takeover) + if ((announce_next_takeover) && (syncWalSnd == MyWalSnd)) { announce_next_takeover = false; ereport(LOG, @@ -515,6 +612,22 @@ SyncRepGetStandbyPriority(void) return (found ? priority : 0); } + +/* + * Check if we are a failback safe standby + * + * Compare the parameter SyncRepStandbyNames against the application_name + * for this WALSender, or allow any name if we find a wildcard "*". + */ +static bool +SyncRepCheckIfFailbackSafe(void) +{ + if (pg_strcasecmp(FailbackSafeStandbyName, application_name) == 0 || + pg_strcasecmp(FailbackSafeStandbyName, "*") == 0) + return true; + else + return false; +} /* * Walk the specified queue from head. Set the state of any backends that * need to be woken, remove them from the queue, and then wake them. @@ -588,8 +701,10 @@ void SyncRepUpdateSyncStandbysDefined(void) { bool sync_standbys_defined = SyncStandbysDefined(); + bool failback_safe_standby_defined = FailbackSafeStandbyDefined(); - if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) + if ((sync_standbys_defined != WalSndCtl->sync_standbys_defined) || + (failback_safe_standby_defined != WalSndCtl->failback_safe_standby_defined)) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); @@ -600,10 +715,14 @@ SyncRepUpdateSyncStandbysDefined(void) */ if (!sync_standbys_defined) { - int i; + SyncRepWakeQueue(true, SYNC_REP_WAIT_WRITE); + SyncRepWakeQueue(true, SYNC_REP_WAIT_FLUSH); + } - for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + if (!failback_safe_standby_defined) + { + SyncRepWakeQueue(true, SYNC_REP_WAIT_FAILBACK_SAFE_WRITE); + SyncRepWakeQueue(true, SYNC_REP_WAIT_FAILBACK_SAFE_FLUSH); } /* @@ -614,6 +733,7 @@ SyncRepUpdateSyncStandbysDefined(void) * the queue (and never wake up). This prevents that. */ WalSndCtl->sync_standbys_defined = sync_standbys_defined; + WalSndCtl->failback_safe_standby_defined = failback_safe_standby_defined; LWLockRelease(SyncRepLock); } @@ -709,3 +829,20 @@ assign_synchronous_commit(int newval, void *extra) break; } } + +void +assign_failback_safety(int newval, void *extra) +{ + switch (newval) + { + case FAILBACK_SAFETY_REMOTE_WRITE: + FailbackSafeRepWaitMode = SYNC_REP_WAIT_FAILBACK_SAFE_WRITE; + break; + case FAILBACK_SAFETY_REMOTE_FLUSH: + FailbackSafeRepWaitMode = SYNC_REP_WAIT_FAILBACK_SAFE_FLUSH; + break; + default: + FailbackSafeRepWaitMode = SYNC_REP_NO_WAIT; + break; + } +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 43eb7d5..8fa4aed 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -41,6 +41,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "replication/syncrep.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -1978,6 +1979,13 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) XLogFlush(recptr); /* + * If failback safe standby is defined, also ensure that the WAL is + * received by the standby before we write to the disk + */ + if (buf->flags & BM_PERMANENT) + SyncRepWaitForLSN(recptr, true, true); + + /* * Now it's safe to write buffer to disk. Note that no one else should * have been able to write it while we were busy with log flushing because * we have the io_in_progress lock. diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c index 2c7d9f3..be961fd 100644 --- a/src/backend/utils/cache/relmapper.c +++ b/src/backend/utils/cache/relmapper.c @@ -722,6 +722,11 @@ write_relmap_file(bool shared, RelMapFile *newmap, /* As always, WAL must hit the disk before the data update does */ XLogFlush(lsn); + + /* + * XXX Should we also wait for the failback safe standby to receive the + * WAL ? + */ } errno = 0; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ea16c64..612a6a2 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -379,6 +379,20 @@ static const struct config_enum_entry synchronous_commit_options[] = { }; /* + * Although only "off", "remote_write", and "remote_flush" are documented, we + * accept all the likely variants of "off". + */ +static const struct config_enum_entry failback_safety_options[] = { + {"remote_write", FAILBACK_SAFETY_REMOTE_WRITE, false}, + {"remote_flush", FAILBACK_SAFETY_REMOTE_FLUSH, false}, + {"off", FAILBACK_SAFETY_OFF, false}, + {"false", FAILBACK_SAFETY_OFF, true}, + {"no", FAILBACK_SAFETY_OFF, true}, + {"0", FAILBACK_SAFETY_OFF, true}, + {NULL, 0, false} +}; + +/* * Options for enum values stored in other modules */ extern const struct config_enum_entry wal_level_options[]; @@ -3067,6 +3081,16 @@ static struct config_string ConfigureNamesString[] = }, { + {"failback_safe_standby_name", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Name of potential failback safe standby."), + NULL + }, + &FailbackSafeStandbyName, + "", + NULL, NULL, NULL + }, + + { {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets default text search configuration."), NULL @@ -3252,6 +3276,16 @@ static struct config_enum ConfigureNamesEnum[] = }, { + {"failback_safe_standby_mode", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Turns failback safety on/off and sets the level"), + NULL + }, + &failback_safety, + FAILBACK_SAFETY_OFF, failback_safety_options, + NULL, assign_failback_safety, NULL + }, + + { {"trace_recovery_messages", PGC_SIGHUP, DEVELOPER_OPTIONS, gettext_noop("Enables logging of recovery-related debugging information."), gettext_noop("Each level includes all the levels that follow it. The later" diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 0303ac7..c1def9e 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -212,6 +212,11 @@ #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables +#failback_safe_standby_mode = off # failback safety level + # off, remote_write or remote_flush +#failback_safe_standby_name = '' # standby server that is guaranteed to be + # failback safe + # - Master Server - # These settings are ignored on a standby server. @@ -219,6 +224,7 @@ #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all + #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index ab4020a..14ccfe5 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -62,6 +62,7 @@ #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" +#include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/procarray.h" #include "utils/tqual.h" @@ -118,6 +119,15 @@ SetHintBits(HeapTupleHeader tuple, Buffer buffer, if (XLogNeedsFlush(commitLSN) && BufferIsPermanent(buffer)) return; /* not flushed yet, so don't set hint */ + + /* + * If failback safe standby is configured, we should also check + * if the commit WAL record has made to the standby before allowing + * hint bit updates. We should not wait for the standby to receive the + * WAL since its OK to delay hint bit updates + */ + if (!SyncRepWaitForLSN(commitLSN, true, false)) + return; } tuple->t_infomask |= infomask; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index ac23ea6..35a5212 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -19,23 +19,41 @@ #define SyncRepRequested() \ (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) +#define FailbackSafeRepRequested() \ + (max_wal_senders > 0 && failback_safety > FAILBACK_SAFETY_OFF) + /* SyncRepWaitMode */ -#define SYNC_REP_NO_WAIT -1 -#define SYNC_REP_WAIT_WRITE 0 -#define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_NO_WAIT -1 +#define SYNC_REP_WAIT_WRITE 0 +#define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_FAILBACK_SAFE_WRITE 2 +#define SYNC_REP_WAIT_FAILBACK_SAFE_FLUSH 3 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 4 /* syncRepState */ -#define SYNC_REP_NOT_WAITING 0 -#define SYNC_REP_WAITING 1 -#define SYNC_REP_WAIT_COMPLETE 2 +#define SYNC_REP_NOT_WAITING 0 +#define SYNC_REP_WAITING 1 +#define SYNC_REP_WAITING_FOR_FAILBACK_SAFETY 2 +#define SYNC_REP_WAIT_COMPLETE 3 + +typedef enum +{ + FAILBACK_SAFETY_OFF, /* no failback safety */ + FAILBACK_SAFETY_REMOTE_WRITE, /* wait for remote write only */ + FAILBACK_SAFETY_REMOTE_FLUSH /* wait for remote flush */ +} FailbackSafetyLevel; /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; +/* user-settable parameters for failback safe replication */ +extern char *FailbackSafeStandbyName; +extern int failback_safety; + /* called by user backend */ -extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); +extern bool SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, + bool ForFailbackSafety, bool Wait); /* called at backend exit */ extern void SyncRepCleanupAtProcExit(void); @@ -52,5 +70,6 @@ extern int SyncRepWakeQueue(bool all, int mode); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); +extern void assign_failback_safety(int newval, void *extra); #endif /* _SYNCREP_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7eaa21b..0142c0f 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -62,6 +62,9 @@ typedef struct WalSnd * SyncRepLock. */ int sync_standby_priority; + + /* Track if we are serving a failback safe standby */ + bool is_failback_safe; } WalSnd; extern WalSnd *MyWalSnd; @@ -88,6 +91,13 @@ typedef struct */ bool sync_standbys_defined; + /* + * Is any failback safe standby defined? Waiting backends can't reload the + * config file safely, so checkpointer updates this value as needed. + * Protected by SyncRepLock. + */ + bool failback_safe_standby_defined; + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData;