diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a7762ea..c00b277 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -407,7 +407,6 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ TimeLineID ThisTimeLineID; - TimeLineID RecoveryTargetTLI; /* * archiveCleanupCommand is read from recovery.conf but needs to be in @@ -456,6 +455,8 @@ typedef struct XLogCtlData XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* current effective recovery target timeline. */ + TimeLineID RecoveryTargetTLI; /* * timestamp of when we started replaying the current chunk of WAL data, @@ -4470,12 +4471,17 @@ rescanLatestTimeLine(void) ThisTimeLineID))); else { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + /* Switch target */ recoveryTargetTLI = newtarget; list_free(expectedTLIs); expectedTLIs = newExpectedTLIs; - XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RecoveryTargetTLI = recoveryTargetTLI; + SpinLockRelease(&xlogctl->info_lck); ereport(LOG, (errmsg("new target timeline is %u", @@ -7518,8 +7524,15 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) TimeLineID GetRecoveryTargetTLI(void) { - /* RecoveryTargetTLI doesn't change so we need no lock to copy it */ - return XLogCtl->RecoveryTargetTLI; + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + TimeLineID result; + + SpinLockAcquire(&xlogctl->info_lck); + result = xlogctl->RecoveryTargetTLI; + SpinLockRelease(&xlogctl->info_lck); + + return result; } /* @@ -8309,7 +8322,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ - endptr = GetStandbyFlushRecPtr(); + endptr = GetStandbyFlushRecPtr(NULL); KeepLogSeg(endptr, &_logSegNo); _logSegNo--; @@ -9819,13 +9832,14 @@ do_pg_abort_backup(void) * Get latest redo apply position. * * Optionally, returns the end byte position of the last restored - * WAL segment. Callers not interested in that value may pass - * NULL for restoreLastRecPtr. + * WAL segment, and the current recovery target timeline. Callers not + * interested in those values may pass NULL for restoreLastRecPtr and + * targetTLI. * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr -GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) +GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr, TimeLineID *targetTli) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -9835,6 +9849,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) recptr = xlogctl->recoveryLastRecPtr; if (restoreLastRecPtr) *restoreLastRecPtr = xlogctl->restoreLastRecPtr; + if (targetTli) + *targetTli = xlogctl->RecoveryTargetTLI; SpinLockRelease(&xlogctl->info_lck); return recptr; @@ -9843,16 +9859,18 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) /* * Get current standby flush position, ie, the last WAL position * known to be fsync'd to disk in standby. + * + * If 'tli' is not NULL, it's set to the current recovery target timeline. */ XLogRecPtr -GetStandbyFlushRecPtr(void) +GetStandbyFlushRecPtr(TimeLineID *targetTLI) { XLogRecPtr receivePtr; XLogRecPtr replayPtr; XLogRecPtr restorePtr; receivePtr = GetWalRcvWriteRecPtr(NULL); - replayPtr = GetXLogReplayRecPtr(&restorePtr); + replayPtr = GetXLogReplayRecPtr(&restorePtr, targetTLI); if (XLByteLT(receivePtr, replayPtr)) return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index d345761..b7c864e 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -247,7 +247,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; - recptr = GetXLogReplayRecPtr(NULL); + recptr = GetXLogReplayRecPtr(NULL, NULL); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8694570..3ce9eb7 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -278,7 +278,8 @@ WalReceiverMain(void) DisableWalRcvImmediateExit(); /* Initialize LogstreamResult, reply_message and feedback_message */ - LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); + LogstreamResult.Write = LogstreamResult.Flush = + GetXLogReplayRecPtr(NULL, NULL); MemSet(&reply_message, 0, sizeof(reply_message)); MemSet(&feedback_message, 0, sizeof(feedback_message)); @@ -654,7 +655,7 @@ XLogWalRcvSendReply(void) /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; - reply_message.apply = GetXLogReplayRecPtr(NULL); + reply_message.apply = GetXLogReplayRecPtr(NULL, NULL); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 9eba180..68d0d6f 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -258,7 +258,7 @@ GetReplicationApplyDelay(void) receivePtr = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); - replayPtr = GetXLogReplayRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(NULL, NULL); if (XLByteEQ(receivePtr, replayPtr)) return 0; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 38f7a3f..ae508da 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -303,7 +303,7 @@ IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); + logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); @@ -1137,7 +1137,31 @@ XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ - SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); + if (am_cascading_walsender) + { + TimeLineID currentTargetTLI; + SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI); + + /* + * If the recovery target timeline changed, bail out. It's a bit + * unfortunate that we have to just disconnect, but there is no way + * to tell the client that the timeline changed. We also don't know + * exactly where the switch happened, so we cannot safely try to send + * up to the switchover point before disconnecting. + */ + if (currentTargetTLI != ThisTimeLineID) + { + if (!walsender_ready_to_stop) + ereport(LOG, + (errmsg("terminating walsender process to force cascaded standby " + "to update timeline and reconnect"))); + walsender_ready_to_stop = true; + *caughtup = true; + return; + } + } + else + GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index ec79870..771fac9 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -285,8 +285,9 @@ extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); -extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); -extern XLogRecPtr GetStandbyFlushRecPtr(void); +extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr, + TimeLineID *targetTli); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); extern bool RecoveryIsPaused(void);