From c56a32b5ace8a48908da366e5f778fa98a125740 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Thu, 14 Apr 2016 15:43:20 +0800 Subject: [PATCH] Enable logical timeline following in the walsender --- src/backend/access/transam/xlogutils.c | 7 +++---- src/backend/replication/walsender.c | 11 +++++++---- src/include/access/xlogreader.h | 3 +++ src/include/access/xlogutils.h | 2 ++ 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index c3213ac..c3b0e5c 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -773,7 +773,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) * might be reading from historical timeline data on a segment that's been * copied to a new timeline. */ -static void +void XLogReadDetermineTimeline(XLogReaderState *state) { /* Read the history on first time through */ @@ -856,12 +856,11 @@ XLogReadDetermineTimeline(XLogReaderState *state) state->currTLIValidUntil == InvalidXLogRecPtr) { XLogRecPtr tliSwitch; - TimeLineID nextTLI; CHECK_FOR_INTERRUPTS(); tliSwitch = tliSwitchPoint(state->currTLI, state->timelineHistory, - &nextTLI); + &state->nextTLI); /* round ValidUntil down to start of seg containing the switch */ state->currTLIValidUntil = @@ -875,7 +874,7 @@ XLogReadDetermineTimeline(XLogReaderState *state) * * If that's the current TLI we'll stop searching. */ - state->currTLI = nextTLI; + state->currTLI = state->nextTLI; state->currTLIValidUntil = InvalidXLogRecPtr; } } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e4a0119..495bff2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -47,6 +47,7 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -756,6 +757,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLogRecPtr flushptr; int count; + XLogReadDetermineTimeline(state); + sendTimeLineIsHistoric = state->currTLI == ThisTimeLineID; + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -984,10 +991,6 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* setup state for XLogReadPage */ - sendTimeLineIsHistoric = false; - sendTimeLine = ThisTimeLineID; - /* * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 300747d..bbee552 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -166,6 +166,9 @@ struct XLogReaderState XLogRecPtr currRecPtr; /* timeline to read it from, 0 if a lookup is required */ TimeLineID currTLI; + /* timeline that follows currTLI */ + TimeLineID nextTLI; + /* * Safe point to read to in currTLI. If currTLI is historical, then this * is set to the end of the last whole segment that contains that TLI; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index d027ea1..9a32ab7 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -52,4 +52,6 @@ extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI); +extern void XLogReadDetermineTimeline(XLogReaderState *state); + #endif -- 2.1.0