From 579c1587a5574b790d257ea790fe42a5531bbb31 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Thu, 11 Feb 2016 10:44:14 +0800 Subject: [PATCH 1/3] Allow logical slots to follow timeline switches Make logical replication slots timeline-aware, so replay can continue from a historical timeline onto the server's current timeline. This is required to make failover slots possible and may also be used by extensions that CreateReplicationSlot on a standby and replay from that slot once the replica is promoted. This does NOT add support for replaying from a logical slot on a standby or for syncing slots to replicas. --- src/backend/access/transam/xlogreader.c | 43 ++++- src/backend/access/transam/xlogutils.c | 240 +++++++++++++++++++++++-- src/backend/replication/logical/logicalfuncs.c | 38 +++- src/include/access/xlogreader.h | 35 +++- src/include/access/xlogutils.h | 2 + 5 files changed, 323 insertions(+), 35 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index fcb0872..5899f44 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -10,6 +10,9 @@ * * NOTES * See xlogreader.h for more notes on this facility. + * + * The xlogreader is compiled as both front-end and backend code so + * it may not use elog, server-defined static variables, etc. *------------------------------------------------------------------------- */ @@ -116,6 +119,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) return NULL; } + /* Will be loaded on first read */ + state->timelineHistory = NULL; + return state; } @@ -135,6 +141,13 @@ XLogReaderFree(XLogReaderState *state) pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf); +#ifdef FRONTEND + /* FE code doesn't use this and we can't list_free_deep on FE */ + Assert(state->timelineHistory == NULL); +#else + if (state->timelineHistory) + list_free_deep(state->timelineHistory); +#endif pfree(state->readBuf); pfree(state); } @@ -208,9 +221,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) if (RecPtr == InvalidXLogRecPtr) { + /* No explicit start point, read the record after the one we just read */ RecPtr = state->EndRecPtr; if (state->ReadRecPtr == InvalidXLogRecPtr) + /* allow readPageTLI to go backward */ randAccess = true; /* @@ -223,6 +238,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) else { /* + * Caller supplied a position to start at. + * * In this case, the passed-in record pointer should already be * pointing to a valid record starting position. */ @@ -309,8 +326,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) /* XXX: more validation should be done here */ if (total_len < SizeOfXLogRecord) { - report_invalid_record(state, "invalid record length at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); + report_invalid_record(state, "invalid record length at %X/%X: wanted %lu, got %u", + (uint32) (RecPtr >> 32), (uint32) RecPtr, + SizeOfXLogRecord, total_len); goto err; } gotheader = false; @@ -466,9 +484,7 @@ err: * Invalidate the xlog page we've cached. We might read from a different * source after failure. */ - state->readSegNo = 0; - state->readOff = 0; - state->readLen = 0; + XLogReaderInvalCache(state); if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; @@ -599,9 +615,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, { if (record->xl_tot_len < SizeOfXLogRecord) { - report_invalid_record(state, - "invalid record length at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); + report_invalid_record(state, "invalid record length at %X/%X: wanted %lu, got %u", + (uint32) (RecPtr >> 32), (uint32) RecPtr, + SizeOfXLogRecord, record->xl_tot_len); return false; } if (record->xl_rmid > RM_MAX_ID) @@ -1337,3 +1353,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) return true; } + +/* + * Invalidate the xlog reader's cached page to force a re-read + */ +void +XLogReaderInvalCache(XLogReaderState *state) +{ + state->readSegNo = 0; + state->readOff = 0; + state->readLen = 0; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 444e218..21f2030 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -7,6 +7,9 @@ * This file contains support routines that are used by XLOG replay functions. * None of this code is used during normal system operation. * + * Unlike xlogreader.c this is only compiled for the backend so it may use + * elog, etc. + * * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -21,6 +24,7 @@ #include "miscadmin.h" +#include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" @@ -651,6 +655,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) static int sendFile = -1; static XLogSegNo sendSegNo = 0; static uint32 sendOff = 0; + /* So we notice if asked for the same seg on a new tli: */ + static TimeLineID lastTLI = 0; p = buf; recptr = startptr; @@ -664,11 +670,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) startoff = recptr % XLogSegSize; - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) + /* Do we need to switch to a new xlog segment? */ + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || lastTLI != tli) { char path[MAXPGPATH]; - /* Switch to another logfile segment */ if (sendFile >= 0) close(sendFile); @@ -692,6 +698,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) path))); } sendOff = 0; + lastTLI = tli; } /* Need to seek in the file? */ @@ -759,28 +766,62 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int count; loc = targetPagePtr + reqLen; + + /* Make sure enough xlog is available... */ while (1) { /* - * TODO: we're going to have to do something more intelligent about - * timelines on standbys. Use readTimeLineHistory() and - * tliOfPointInHistory() to get the proper LSN? For now we'll catch - * that case earlier, but the code and TODO is left in here for when - * that changes. + * Check which timeline to get the record from. + * + * We have to do it after each loop because if we're in + * recovery as a cascading standby the current timeline + * might've become historical. */ - if (!RecoveryInProgress()) + XLogReadDetermineTimeline(state); + + if (state->currTLI == ThisTimeLineID) { - *pageTLI = ThisTimeLineID; - flushptr = GetFlushRecPtr(); + /* + * We're reading from the current timeline so we might + * have to wait for the desired record to be generated + * (or, for a standby, received & replayed) + */ + if (!RecoveryInProgress()) + { + *pageTLI = ThisTimeLineID; + flushptr = GetFlushRecPtr(); + } + else + flushptr = GetXLogReplayRecPtr(pageTLI); + + if (loc <= flushptr) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); } else - flushptr = GetXLogReplayRecPtr(pageTLI); - - if (loc <= flushptr) + { + /* + * We're on a historical timeline, limit reading to the + * switch point where we moved to the next timeline. + */ + flushptr = state->currTLIValidUntil; + + /* + * FIXME: Setting pageTLI to the TLI the *record* we + * want is on can be slightly wrong; the page might + * begin on an older timeline if it contains a timeline + * switch, since its xlog segment will've been copied + * from the prior timeline. We should really read the + * page header. It's pretty harmless though as nothing + * cares so long as the timeline doesn't go backwards. + */ + *pageTLI = state->currTLI; + + /* No need to wait on a historical timeline */ break; - - CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + } } /* more than one block available */ @@ -793,7 +834,172 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else count = flushptr - targetPagePtr; - XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ); + XLogRead(cur_page, *pageTLI, targetPagePtr, count); return count; } + +/* + * Figure out what timeline to look on for the record the xlogreader + * is being asked asked to read, in currRecPtr. This may be used + * to determine which xlog segment file to open, etc. + * + * It depends on: + * + * - Whether we're reading a record immediately following one we read + * before or doing a random read. We can only use the cached + * timeline info if we're reading sequentially. + * + * - Whether the timeline of the prior record read was historical or + * the current timeline and, if historical, on where it's valid up + * to. On a historical timeline we need to avoid reading past the + * timeline switch point. The records after it are probably invalid, + * but worse, they might be valid but *different*. + * + * - If the current timeline became historical since the last record + * we read. We need to make sure we don't read past the switch + * point. + * + * None of this has any effect unless callbacks use currTLI to + * determine which timeline to read from and optionally use the + * validity limit to avoid reading past the valid end of a page. + * + * We need to switch to an xlog segment from the new timeline + * eagerly when on a historical timeline, as soon as we reach the + * start of the xlog segment containing the timeline switch. The + * server copied the segment to the new timeline so all the data up + * to the switch point is the same but there's no guarantee the old + * segment will still exist. It may have been deleted or renamed + * with a .partial suffix so we can't necessarily keep reading from + * the old TLI even though tliSwitchPoint says it's OK. + * + * An xlog segment may contain data from an older timeline + * if it was copied during a timeline switch. Callers may NOT assume + * that currTLI is the timeline that will be in a given page's + * xlp_tli; the page may begin on older timeline or we might be + * reading from historical timeline data on a segment that's + * been copied to a new timeline. + */ +void +XLogReadDetermineTimeline(XLogReaderState *state) +{ + if (state->timelineHistory == NULL) + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); + + if (state->currRecPtr != state->EndRecPtr) + { + /* + * Not reading the immediately following record so + * invalidate cached timeline info. + */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + if (state->currTLIValidUntil == InvalidXLogRecPtr && + state->currTLI != ThisTimeLineID && + state->currTLI != 0) + { + /* + * We were reading what was the current timeline but it became + * historical. Either we were replaying as a replica and got + * promoted or we're replaying as a cascading replica from a + * parent that got promoted. + * + * Force a re-read of the timeline history. + */ + list_free_deep(state->timelineHistory); + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); + + elog(DEBUG2, "timeline %u became historical during decoding", + state->currTLI); + + /* then invalidate the timeline info so we read again */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + if (state->currRecPtr == state->EndRecPtr && + state->currTLI != 0 && + state->currTLIValidUntil != InvalidXLogRecPtr && + state->currRecPtr >= state->currTLIValidUntil) + { + /* + * We're reading the immedately following record but we're + * at a timeline boundary (or on a segment containing one) + * and must read the next record from the new TLI. + */ + elog(DEBUG2, "Requested record %X/%X is on segment containing end of TLI %u " + "valid until %X/%X, switching to next timeline", + (uint32)(state->currRecPtr >> 32), + (uint32)state->currRecPtr, + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil)); + + /* Invalidate TLI info so we look up the next TLI */ + state->currTLI = 0; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + + if (state->currTLI == 0) + { + /* + * Something changed. We're not reading the record immediately + * after the one we just read, the previous record was at + * timeline boundary or we didn't yet determine the timeline + * to read from. + * + * Work out what timeline this record is on. We might read + * it from the segment on this TLI or, if the segment + * contains newer timelines, the copy from a newer TLI. + */ + state->currTLI = tliOfPointInHistory(state->currRecPtr, + state->timelineHistory); + + /* + * Look for the most recent timeline that's on the same xlog + * segment as this record, since that's the only one we can + * assume is still readable. + */ + while (state->currTLI != ThisTimeLineID && + state->currTLIValidUntil == InvalidXLogRecPtr) + { + XLogRecPtr tliSwitch; + TimeLineID nextTLI; + + tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory, + &nextTLI); + + state->currTLIValidUntil = ((tliSwitch / XLogSegSize) * XLogSegSize); + + if (state->currRecPtr >= state->currTLIValidUntil) + { + /* + * The new currTLI ends on this WAL segment so + * check the next TLI to see if it's the last + * one on the segment. + * + * If that's the current TLI we'll stop + * searching. + */ + state->currTLI = nextTLI; + state->currTLIValidUntil = InvalidXLogRecPtr; + } + } + + /* + * We're now either reading from the first xlog seg in the + * current server's timeline or the most recent historical + * timeline that exists on the target segment. + */ + elog(DEBUG2, "XLog read ptr %X/%X is on seg with tli %u valid until %X/%X, server current tli is %u", + (uint32)(state->currRecPtr >> 32), + (uint32)state->currRecPtr, + state->currTLI, + (uint32)(state->currTLIValidUntil >> 32), + (uint32)(state->currTLIValidUntil), + ThisTimeLineID); + } +} + diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index f789fc1..f29fca3 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin rsinfo->setResult = p->tupstore; rsinfo->setDesc = p->tupdesc; - /* compute the current end-of-wal */ - if (!RecoveryInProgress()) - end_of_wal = GetFlushRecPtr(); - else - end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name)); PG_TRY(); @@ -263,6 +257,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx->output_writer_private = p; + /* + * We start reading xlog from the restart lsn, even though in + * CreateDecodingContext we set the snapshot builder up using the + * slot's candidate_restart_lsn. This means we might read xlog we don't + * actually decode rows from, but the snapshot builder might need it to + * get to a consistent point. The point we start returning data to + * *users* at is the candidate restart lsn from the decoding context. + */ startptr = MyReplicationSlot->data.restart_lsn; CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding"); @@ -270,8 +272,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* invalidate non-timetravel entries */ InvalidateSystemCaches(); + if (!RecoveryInProgress()) + end_of_wal = GetFlushRecPtr(); + else + end_of_wal = GetXLogReplayRecPtr(NULL); + + /* Decode until we run out of records */ while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || - (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal)) + (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { XLogRecord *record; char *errm = NULL; @@ -280,6 +288,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin if (errm) elog(ERROR, "%s", errm); + /* + * Now that we've set up the xlog reader state subsequent calls + * pass InvalidXLogRecPtr to say "continue from last record" + */ startptr = InvalidXLogRecPtr; /* @@ -299,6 +311,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin CHECK_FOR_INTERRUPTS(); } + /* Make sure timeline lookups use the start of the next record */ + startptr = ctx->reader->EndRecPtr; + + /* + * The XLogReader will read a page past the valid end of WAL + * because it doesn't know about timelines. When we switch + * timelines and ask it for the first page on the new timeline it + * will think it has it cached, but it'll have the old partial + * page and say it can't find the next record. So flush the cache. + */ + XLogReaderInvalCache(ctx->reader); + tuplestore_donestoring(tupstore); CurrentResourceOwner = old_resowner; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 7553cc4..20e4bca 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -20,12 +20,16 @@ * with the XLogRec* macros and functions. You can also decode a * record that's already constructed in memory, without reading from * disk, by calling the DecodeXLogRecord() function. + * + * The xlogreader is compiled as both front-end and backend code so + * it may not use elog, server-defined static variables, etc. *------------------------------------------------------------------------- */ #ifndef XLOGREADER_H #define XLOGREADER_H #include "access/xlogrecord.h" +#include "nodes/pg_list.h" typedef struct XLogReaderState XLogReaderState; @@ -139,26 +143,48 @@ struct XLogReaderState * ---------------------------------------- */ - /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ + /* + * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to + * at least readLen bytes) + */ char *readBuf; - /* last read segment, segment offset, read length, TLI */ + /* + * last read segment, segment offset, read length, TLI for + * data currently in readBuf. + */ XLogSegNo readSegNo; uint32 readOff; uint32 readLen; TimeLineID readPageTLI; - /* beginning of last page read, and its TLI */ + /* + * beginning of prior page read, and its TLI. Doesn't + * necessarily correspond to what's in readBuf, used for + * timeline sanity checks. + */ XLogRecPtr latestPagePtr; TimeLineID latestPageTLI; /* beginning of the WAL record being read. */ XLogRecPtr currRecPtr; + /* timeline to read it from, 0 if a lookup is required */ + TimeLineID currTLI; + /* + * Pointer to the end of the last whole segment on the timeline in currTLI + * if it's historical or InvalidXLogRecPtr if currTLI is the current + * timeline. This is *not* the tliSwitchPoint but it's guaranteed safe + * to read up to this point from currTLI. + */ + XLogRecPtr currTLIValidUntil; /* Buffer for current ReadRecord result (expandable) */ char *readRecordBuf; uint32 readRecordBufSize; + /* cached timeline history */ + List *timelineHistory; + /* Buffer to hold error message */ char *errormsg_buf; }; @@ -174,6 +200,9 @@ extern void XLogReaderFree(XLogReaderState *state); extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, XLogRecPtr recptr, char **errormsg); +/* Flush any cached page */ +extern void XLogReaderInvalCache(XLogReaderState *state); + #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 1b9abce..86df8cf 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,4 +50,6 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern void XLogReadDetermineTimeline(XLogReaderState *state); + #endif -- 2.1.0