diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index cb95aa3..d216b2e 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -37,6 +37,8 @@ #include "access/transam.h" #include "miscadmin.h" #include "pg_trace.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" /* * Defines for CLOG page sizes. A page is the same BLCKSZ as is used @@ -708,8 +710,10 @@ WriteZeroPageXlogRec(int pageno) /* * Write a TRUNCATE xlog record * - * We must flush the xlog record to disk before returning --- see notes - * in TruncateCLOG(). + * Before returning we must flush the xlog record to disk + * and if synchronous transfer is requested wait for failback + * safe standby to receive WAL up to recptr. + * --- see notes in TruncateCLOG(). */ static void WriteTruncateXlogRec(int pageno) @@ -723,6 +727,12 @@ WriteTruncateXlogRec(int pageno) rdata.next = NULL; recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE, &rdata); XLogFlush(recptr); + + /* + * Wait for failback safe standby. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(recptr, true, true); } /* diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 5e53593..edaee83 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -54,6 +54,8 @@ #include "access/slru.h" #include "access/transam.h" #include "access/xlog.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/shmem.h" #include "miscadmin.h" @@ -744,6 +746,12 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata) START_CRIT_SECTION(); XLogFlush(max_lsn); END_CRIT_SECTION(); + + /* If synchronous transfer is requested, wait for failback safe + * standby to receive WAL up to max_lsn. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(max_lsn, true, true); } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e975f8d..38a9e9c 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1091,12 +1091,12 @@ EndPrepare(GlobalTransaction gxact) END_CRIT_SECTION(); /* - * Wait for synchronous replication, if required. + * Wait for synchronous/synchronous failback safe standby, if required. * * Note that at this stage we have marked the prepare, but still show as * running in the procarray (twice!) and continue to hold locks. */ - SyncRepWaitForLSN(gxact->prepare_lsn); + SyncRepWaitForLSN(gxact->prepare_lsn, false, true); records.tail = records.head = NULL; } @@ -2058,12 +2058,12 @@ RecordTransactionCommitPrepared(TransactionId xid, END_CRIT_SECTION(); /* - * Wait for synchronous replication, if required. + * Wait for synchronous/synchronous failback safe standby, if required. * * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, false, true); } /* @@ -2138,10 +2138,10 @@ RecordTransactionAbortPrepared(TransactionId xid, END_CRIT_SECTION(); /* - * Wait for synchronous replication, if required. + * Wait for synchronous/synchronous failback safe standby, if required. * * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr); + SyncRepWaitForLSN(recptr, false, true); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0591f3f..25210df 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1189,13 +1189,13 @@ RecordTransactionCommit(void) latestXid = TransactionIdLatest(xid, nchildren, children); /* - * Wait for synchronous replication, if required. + * Wait for synchronous/synchronous failback safe standby, if required. * * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ if (wrote_xlog) - SyncRepWaitForLSN(XactLastRecEnd); + SyncRepWaitForLSN(XactLastRecEnd, false, true); /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; @@ -4690,8 +4690,17 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, * for any user that requested ForceSyncCommit(). */ if (XactCompletionForceSyncCommit(xinfo)) + { XLogFlush(lsn); + /* + * If synchronous transfer is requested, wait for failback safe + * standby to receive WAL up to lsn, + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(lsn, true, true); + + } } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index dc47c47..e8e118c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -39,8 +39,10 @@ #include "pgstat.h" #include "postmaster/bgwriter.h" #include "postmaster/startup.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/barrier.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -8278,6 +8280,18 @@ CreateCheckPoint(int flags) END_CRIT_SECTION(); /* + * If synchronous transfer is requested, wait for failback safe standby + * to receive WAL up to checkpoint WAL record. Otherwise if failure occurs + * before standby receives CHECKPOINT WAL record causes an inconsistency + * between control files of master and standby. Because of this master will + * start from a location which is not known to the standby at the time fail-over. + * + * There is no need to wait for shutdown CHECKPOINT. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(recptr, true, !shutdown); + + /* * Let smgr do post-checkpoint cleanup (eg, deleting old files). */ smgrpostckpt(); diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index 971a149..050a6ba 100644 --- a/src/backend/catalog/storage.c +++ b/src/backend/catalog/storage.c @@ -25,6 +25,8 @@ #include "catalog/catalog.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" #include "storage/freespace.h" #include "storage/smgr.h" #include "utils/memutils.h" @@ -288,6 +290,14 @@ RelationTruncate(Relation rel, BlockNumber nblocks) */ if (fsm || vm) XLogFlush(lsn); + + /* + * If synchronous transfer is requested, wait for failback safe standby + * to receive WAL up to lsn. Otherwise, we may have a situation where + * the heap is truncated, but the action never replayed on the standby. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(lsn, true, true); } /* Do the real work */ @@ -521,6 +531,13 @@ smgr_redo(XLogRecPtr lsn, XLogRecord *record) */ XLogFlush(lsn); + /* + * If synchronous transfer is requested, wait for failback safe standby + * to receive WAL up to lsn. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(lsn, true, true); + smgrtruncate(reln, MAIN_FORKNUM, xlrec->blkno); /* Also tell xlogutils.c about it */ diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 8cf1346..f5cc21c 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -66,6 +66,8 @@ char *SyncRepStandbyNames; static bool announce_next_takeover = true; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; +static int SyncTransferMode = SYNC_REP_NO_WAIT; +int synchronous_transfer = SYNCHRONOUS_TRANSFER_COMMIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); @@ -83,20 +85,30 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); */ /* - * Wait for synchronous replication, if requested by user. + * Wait for synchronous/failback safe standby, if requested by user. * * Initially backends start in state SYNC_REP_NOT_WAITING and then - * change that state to SYNC_REP_WAITING before adding ourselves - * to the wait queue. During SyncRepWakeQueue() a WALSender changes - * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. - * This backend then resets its state to SYNC_REP_NOT_WAITING. + * change that state to SYNC_REP_WAITING/SYNC_REP_WAITING_FOR_DATA_FLUSH + * before adding ourselves to the wait queue. During SyncRepWakeQueue() a + * WALSender changes the state to SYNC_REP_WAIT_COMPLETE once replication is + * confirmed. This backend then resets its state to SYNC_REP_NOT_WAITING. + * + * ForDataFlush - if TRUE, we wait before flushing data page. + * Otherwise wait for the sync standby + * + * Wait - if FALSE, we don't actually wait, but tell the caller whether or not + * the standby has already made progressed upto the given XactCommitLSN + * + * Return TRUE if either the synchronous standby/failback safe standby is not + * configured/turned off OR the standby has made enough progress */ -void -SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +bool +SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool ForDataFlush, bool Wait) { char *new_status = NULL; const char *old_status; - int mode = SyncRepWaitMode; + int mode = !ForDataFlush ? SyncRepWaitMode : SyncTransferMode; + bool ret; /* * Fast exit if user has not requested sync replication, or there are no @@ -104,7 +116,26 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * need to be connected. */ if (!SyncRepRequested() || !SyncStandbysDefined()) - return; + return true; + + /* + * If the caller has specified ForDataFlush, but synchronous transfer + * is not specified or its turned off, exit. + * + * We would like to allow the failback safe mechanism even for cascaded + * standbys as well. But we can't really wait for the standby to catch + * up until we reach a consistent state since the standbys won't be + * even able to connect without us reaching in that state (XXX Confirm) + */ + if ((!SyncTransRequested()) && ForDataFlush) + return true; + + /* + * If the caller has not specified ForDataFlush, but synchronous commit + * is skipped by values of synchronous_transfer, exit. + */ + if (IsSyncRepSkipped() && !ForDataFlush) + return true; Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(WalSndCtl != NULL); @@ -120,11 +151,20 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * condition but we'll be fetching that cache line anyway so it's likely to * be a low cost check. */ - if (!WalSndCtl->sync_standbys_defined || + if ((!ForDataFlush && !WalSndCtl->sync_standbys_defined) || XactCommitLSN <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); - return; + return true; + } + + /* + * Exit if we are told not to block on the standby. + */ + if (!Wait) + { + LWLockRelease(SyncRepLock); + return false; } /* @@ -151,6 +191,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) new_status[len] = '\0'; /* truncate off " waiting ..." */ } + ret = false; + /* * Wait for specified LSN to be confirmed. * @@ -187,7 +229,10 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) LWLockRelease(SyncRepLock); } if (syncRepState == SYNC_REP_WAIT_COMPLETE) + { + ret = true; break; + } /* * If a wait for synchronous replication is pending, we can neither @@ -264,6 +309,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) set_ps_display(new_status, false); pfree(new_status); } + + return ret; } /* @@ -371,6 +418,7 @@ SyncRepReleaseWaiters(void) volatile WalSnd *syncWalSnd = NULL; int numwrite = 0; int numflush = 0; + int numdataflush = 0; int priority = 0; int i; @@ -438,13 +486,20 @@ SyncRepReleaseWaiters(void) { walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); + + } + if (walsndctl->lsn[SYNC_REP_WAIT_DATA_FLUSH] < MyWalSnd->flush) + { + walsndctl->lsn[SYNC_REP_WAIT_DATA_FLUSH] = MyWalSnd->flush; + numdataflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_DATA_FLUSH); + } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); /* * If we are managing the highest priority standby, though we weren't @@ -710,3 +765,18 @@ assign_synchronous_commit(int newval, void *extra) break; } } + +void +assign_synchronous_transfer(int newval, void *extra) +{ + switch (newval) + { + case SYNCHRONOUS_TRANSFER_ALL: + case SYNCHRONOUS_TRANSFER_DATA_FLUSH: + SyncTransferMode = SYNC_REP_WAIT_DATA_FLUSH; + break; + default: + SyncTransferMode = SYNC_REP_NO_WAIT; + break; + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index afd559d..492e039 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1539,6 +1539,10 @@ XLogSend(bool *caughtup) *caughtup = true; + elog(WARNING, "XLogSend sendTimeLineValidUpto(%X/%X) <= sentPtr(%X/%X) AND sendTImeLine", + (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), (uint32) sentPtr); + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, (uint32) (sentPtr >> 32), (uint32) sentPtr); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f848391..7a2e285 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -41,6 +41,8 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -1975,8 +1977,14 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln) * skip the flush if the buffer isn't permanent. */ if (buf->flags & BM_PERMANENT) + { XLogFlush(recptr); - + /* If synchronous transfer is requested, wait for failback safe standby + * to receive WAL up to recptr. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(recptr, true, true); + } /* * Now it's safe to write buffer to disk. Note that no one else should * have been able to write it while we were busy with log flushing because diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c index 18f0342..e92b607 100644 --- a/src/backend/utils/cache/relmapper.c +++ b/src/backend/utils/cache/relmapper.c @@ -48,6 +48,8 @@ #include "catalog/pg_tablespace.h" #include "catalog/storage.h" #include "miscadmin.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/lwlock.h" #include "utils/inval.h" @@ -711,6 +713,7 @@ write_relmap_file(bool shared, RelMapFile *newmap, int fd; RelMapFile *realmap; char mapfilename[MAXPGPATH]; + XLogRecPtr lsn=InvalidXLogRecPtr; /* * Fill in the overhead fields and update CRC. @@ -753,7 +756,6 @@ write_relmap_file(bool shared, RelMapFile *newmap, { xl_relmap_update xlrec; XLogRecData rdata[2]; - XLogRecPtr lsn; /* now errors are fatal ... */ START_CRIT_SECTION(); @@ -775,6 +777,7 @@ write_relmap_file(bool shared, RelMapFile *newmap, /* As always, WAL must hit the disk before the data update does */ XLogFlush(lsn); + } errno = 0; @@ -849,6 +852,13 @@ write_relmap_file(bool shared, RelMapFile *newmap, /* Critical section done */ if (write_wal) END_CRIT_SECTION(); + + /* + * If synchronous transfer is requested, wait for failback safe + * standby to receive WAL up to recptr. + */ + if (SyncTransRequested()) + SyncRepWaitForLSN(lsn, true, true); } /* diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 7d297bc..7e226a5 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -381,6 +381,18 @@ static const struct config_enum_entry synchronous_commit_options[] = { }; /* + * Although only "all", "data_flush", and "commit" are documented, we + * accept all the likely variants of "off". + */ +static const struct config_enum_entry synchronous_transfer_options[] = { + {"all", SYNCHRONOUS_TRANSFER_ALL, false}, + {"data_flush", SYNCHRONOUS_TRANSFER_DATA_FLUSH, false}, + {"commit", SYNCHRONOUS_TRANSFER_COMMIT, true}, + {"0", SYNCHRONOUS_TRANSFER_COMMIT, true}, + {NULL, 0, false} +}; + +/* * Options for enum values stored in other modules */ extern const struct config_enum_entry wal_level_options[]; @@ -3288,6 +3300,16 @@ static struct config_enum ConfigureNamesEnum[] = }, { + {"synchronous_transfer", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Sets the data flush synchronization level"), + NULL + }, + &synchronous_transfer, + SYNCHRONOUS_TRANSFER_COMMIT, synchronous_transfer_options, + NULL, assign_synchronous_transfer, NULL + }, + + { {"trace_recovery_messages", PGC_SIGHUP, DEVELOPER_OPTIONS, gettext_noop("Enables logging of recovery-related debugging information."), gettext_noop("Each level includes all the levels that follow it. The later" diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d69a02b..d6603c2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -220,6 +220,8 @@ #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all +#synchronous_transfer = commit # data page synchronization level + # commit, data_flush or all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index ed66c49..6cf3f26 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -60,6 +60,8 @@ #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" +#include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/procarray.h" #include "utils/tqual.h" @@ -115,6 +117,18 @@ SetHintBits(HeapTupleHeader tuple, Buffer buffer, if (XLogNeedsFlush(commitLSN) && BufferIsPermanent(buffer)) return; /* not flushed yet, so don't set hint */ + + /* + * If synchronous transfer is requested, we check if the commit WAL record + * has made to the standby before allowing hint bit updates. We should not + * wait for the standby to receive the WAL since its OK to delay hint bit + * updates. + */ + if (SyncTransRequested()) + { + if(!SyncRepWaitForLSN(commitLSN, true, false)) + return; + } } tuple->t_infomask |= infomask; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index ac23ea6..4540625 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -19,23 +19,42 @@ #define SyncRepRequested() \ (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) +#define SyncTransRequested() \ + (max_wal_senders > 0 && synchronous_transfer > SYNCHRONOUS_TRANSFER_COMMIT) + +#define IsSyncRepSkipped() \ + (max_wal_senders > 0 && synchronous_transfer == SYNCHRONOUS_TRANSFER_DATA_FLUSH) + /* SyncRepWaitMode */ -#define SYNC_REP_NO_WAIT -1 -#define SYNC_REP_WAIT_WRITE 0 -#define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_NO_WAIT -1 +#define SYNC_REP_WAIT_WRITE 0 +#define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_DATA_FLUSH 2 -#define NUM_SYNC_REP_WAIT_MODE 2 +#define NUM_SYNC_REP_WAIT_MODE 3 /* syncRepState */ -#define SYNC_REP_NOT_WAITING 0 -#define SYNC_REP_WAITING 1 -#define SYNC_REP_WAIT_COMPLETE 2 +#define SYNC_REP_NOT_WAITING 0 +#define SYNC_REP_WAITING 1 +#define SYNC_REP_WAIT_COMPLETE 2 + +typedef enum +{ + SYNCHRONOUS_TRANSFER_COMMIT, /* no wait for flush data page */ + SYNCHRONOUS_TRANSFER_DATA_FLUSH, /* wait for data page flush only + * no wait for WAL */ + SYNCHRONOUS_TRANSFER_ALL /* wait for data page flush */ +} SynchronousTransferLevel; /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; +/* user-settable parameters for failback safe replication */ +extern int synchronous_transfer; + /* called by user backend */ -extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); +extern bool SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, + bool ForDataFlush, bool Wait); /* called at backend exit */ extern void SyncRepCleanupAtProcExit(void); @@ -52,5 +71,6 @@ extern int SyncRepWakeQueue(bool all, int mode); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); +extern void assign_synchronous_transfer(int newval, void *extra); #endif /* _SYNCREP_H */