diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3b614b6..36e00e0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3282,6 +3282,26 @@ include_dir 'conf.d'
+
+ replay_lag_sample_interval (integer)
+
+ replay_lag_sample_interval> configuration parameter
+
+
+
+
+ Controls how often a standby should sample replay lag information to
+ send back to the primary or upstream standby while replaying WAL. The
+ default is 1 second. Units are milliseconds if not specified. A
+ value of -1 disables the reporting of replay lag. Estimated replay lag
+ can be seen in the
+ pg_stat_replication> view of the upstream server.
+ This parameter can only be set
+ in the postgresql.conf> file or on the server command line.
+
+
+
+
hot_standby_feedback (boolean)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5b58d2e..79d2d95 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1401,6 +1401,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
standby server
+ replay_lag>
+ interval>
+ Estimated time taken for recent WAL records to be replayed on this
+ standby server
+
+ sync_priority>integer>Priority of this standby server for being chosen as the
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index aa9ee5a..739e45e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -82,6 +82,8 @@ extern uint32 bootstrap_data_checksum_version;
#define PROMOTE_SIGNAL_FILE "promote"
#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
+/* Size of the circular buffer of timestamped LSNs. */
+#define XLOG_TIMESTAMP_BUFFER_SIZE 8192
/* User-settable parameters */
int max_wal_size = 64; /* 1 GB */
@@ -520,6 +522,26 @@ typedef struct XLogCtlInsert
} XLogCtlInsert;
/*
+ * A sample associating a timestamp with a given xlog position.
+ */
+typedef struct XLogTimestamp
+{
+ TimestampTz timestamp;
+ XLogRecPtr lsn;
+} XLogTimestamp;
+
+/*
+ * A circular buffer of LSNs and associated timestamps. The buffer is empty
+ * when read_head == write_head.
+ */
+typedef struct XLogTimestampBuffer
+{
+ uint32 read_head;
+ uint32 write_head;
+ XLogTimestamp buffer[XLOG_TIMESTAMP_BUFFER_SIZE];
+} XLogTimestampBuffer;
+
+/*
* Total shared-memory state for XLOG.
*/
typedef struct XLogCtlData
@@ -637,6 +659,12 @@ typedef struct XLogCtlData
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* timestamp from the most recently applied record associated with a timestamp. */
+ TimestampTz lastReplayedTimestamp;
+
+ /* a buffer of upstream timestamps for WAL that is not yet applied. */
+ XLogTimestampBuffer timestamps;
+
/*
* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery
@@ -5976,6 +6004,44 @@ CheckRequiredParameterValues(void)
}
/*
+ * Called by the startup process after it has replayed up to 'lsn'. Checks
+ * for timestamps associated with WAL positions that have now been replayed.
+ * If any are found, the latest such timestamp found is written to
+ * '*timestamp'. Returns the new buffer read head position, which the caller
+ * should write into XLogCtl->timestamps.read_head while holding info_lck.
+ */
+static uint32
+CheckForReplayedTimestamps(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+ uint32 read_head;
+
+ /*
+ * It's OK to access timestamps.read_head without any kind synchronization
+ * because this process is the only one to write to it.
+ */
+ Assert(AmStartupProcess());
+ read_head = XLogCtl->timestamps.read_head;
+
+ /*
+ * It's OK to access write_head without interlocking because it's an
+ * aligned 32 bit value which we can read atomically on all supported
+ * platforms to get some recent value, not a torn/garbage value.
+ * Furthermore we must see a value that is at least as recent as any WAL
+ * that we have replayed, because walreceiver calls
+ * SetXLogReplayTimestampAtLsn before passing the corresponding WAL data
+ * to the recovery process.
+ */
+ while (read_head != XLogCtl->timestamps.write_head &&
+ XLogCtl->timestamps.buffer[read_head].lsn <= lsn)
+ {
+ *timestamp = XLogCtl->timestamps.buffer[read_head].timestamp;
+ read_head = (read_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+ }
+
+ return read_head;
+}
+
+/*
* This must be called ONCE during postmaster or standalone-backend startup
*/
void
@@ -6794,6 +6860,8 @@ StartupXLOG(void)
do
{
bool switchedTLI = false;
+ TimestampTz replayed_timestamp = 0;
+ uint32 timestamp_read_head;
#ifdef WAL_DEBUG
if (XLOG_DEBUG ||
@@ -6947,24 +7015,34 @@ StartupXLOG(void)
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+ /* Check if we have replayed a timestamped WAL position */
+ timestamp_read_head =
+ CheckForReplayedTimestamps(EndRecPtr, &replayed_timestamp);
+
/*
- * Update lastReplayedEndRecPtr after this record has been
- * successfully replayed.
+ * Update lastReplayedEndRecPtr and lastReplayedTimestamp
+ * after this record has been successfully replayed.
*/
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
XLogCtl->lastReplayedTLI = ThisTimeLineID;
+ XLogCtl->timestamps.read_head = timestamp_read_head;
+ if (replayed_timestamp != 0)
+ XLogCtl->lastReplayedTimestamp = replayed_timestamp;
SpinLockRelease(&XLogCtl->info_lck);
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake
* up the receiver so that it notices the updated
- * lastReplayedEndRecPtr and sends a reply to the master.
+ * lastReplayedEndRecPtr and sends a reply to the master. We
+ * also wake it if we have replayed a WAL position that has
+ * an associated timestamp so that the upstream server can
+ * measure our replay lag.
*/
- if (doRequestWalReceiverReply)
+ if (doRequestWalReceiverReply || replayed_timestamp != 0)
{
doRequestWalReceiverReply = false;
- WalRcvForceReply();
+ WalRcvForceReply(replayed_timestamp != 0);
}
/* Remember this record as the last-applied one */
@@ -11745,3 +11823,81 @@ XLogRequestWalReceiverReply(void)
{
doRequestWalReceiverReply = true;
}
+
+/*
+ * Record the timestamp that is associated with a WAL position.
+ *
+ * This is called by walreceiver on standby servers when new messages arrive,
+ * using a timestamp and the latest known WAL position from the upstream
+ * server. The timestamp will be sent back to the upstream server via
+ * walreceiver when the recovery process has applied the WAL position. The
+ * upstream server can then compute the elapsed time to estimate the replay
+ * lag.
+ */
+void
+SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn)
+{
+ Assert(AmWalReceiverProcess());
+
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (lsn == XLogCtl->lastReplayedEndRecPtr)
+ {
+ /*
+ * That is the last replayed LSN: we are fully replayed, so we can
+ * update the replay timestamp immediately.
+ */
+ XLogCtl->lastReplayedTimestamp = timestamp;
+ }
+ else
+ {
+ /*
+ * There is WAL still to be applied. We will associate the timestamp
+ * with this WAL position and wait for it to be replayed. We add it
+ * at the 'write' end of the circular buffer of LSN/timestamp
+ * mappings, which the replay loop will eventually read.
+ */
+ uint32 write_head = XLogCtl->timestamps.write_head;
+ uint32 new_write_head = (write_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+
+ if (new_write_head == XLogCtl->timestamps.read_head)
+ {
+ /*
+ * The buffer is full, so we'll rewind and overwrite the most
+ * recent sample. Overwriting the most recent sample means that
+ * if we're not replaying fast enough and the buffer fills up,
+ * we'll effectively lower the sampling rate.
+ */
+ new_write_head = write_head;
+ write_head = (write_head - 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+ }
+
+ XLogCtl->timestamps.buffer[write_head].lsn = lsn;
+ XLogCtl->timestamps.buffer[write_head].timestamp = timestamp;
+ XLogCtl->timestamps.write_head = new_write_head;
+ }
+ SpinLockRelease(&XLogCtl->info_lck);
+}
+
+/*
+ * Get the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the upstream server, and also the most recently applied LSN.
+ * (Note that the timestamp and the LSN don't necessarily relate to the same
+ * record.)
+ *
+ * This is similar to GetLatestXTime, except that it is advanced when WAL
+ * positions recorded with SetXLogReplayTimestampAtLsn have been applied,
+ * rather than commit records.
+ */
+TimestampTz
+GetXLogReplayTimestamp(XLogRecPtr *lsn)
+{
+ TimestampTz result;
+
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (lsn)
+ *lsn = XLogCtl->lastReplayedEndRecPtr;
+ result = XLogCtl->lastReplayedTimestamp;
+ SpinLockRelease(&XLogCtl->info_lck);
+
+ return result;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 48e7c4b..e0e45fa 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -685,6 +685,7 @@ CREATE VIEW pg_stat_replication AS
W.write_location,
W.flush_location,
W.replay_location,
+ W.replay_lag,
W.sync_priority,
W.sync_state
FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index cc3cf7d..9cf9f4c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -73,6 +73,7 @@
int wal_receiver_status_interval;
int wal_receiver_timeout;
bool hot_standby_feedback;
+int replay_lag_sample_interval;
/* libpqwalreceiver connection */
static WalReceiverConn *wrconn = NULL;
@@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, bool includeApplyTimestamp);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -456,7 +457,7 @@ WalReceiverMain(void)
}
/* Let the master know that we received some data. */
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(false, false, false);
/*
* If we've written some records, flush them to disk and
@@ -493,6 +494,8 @@ WalReceiverMain(void)
ResetLatch(walrcv->latch);
if (walrcv->force_reply)
{
+ bool timestamp = walrcv->force_reply_apply_timestamp;
+
/*
* The recovery process has asked us to send apply
* feedback now. Make sure the flag is really set to
@@ -500,8 +503,9 @@ WalReceiverMain(void)
* we don't miss a new request for a reply.
*/
walrcv->force_reply = false;
+ walrcv->force_reply_apply_timestamp = false;
pg_memory_barrier();
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(true, false, timestamp);
}
}
if (rc & WL_POSTMASTER_DEATH)
@@ -559,7 +563,7 @@ WalReceiverMain(void)
}
}
- XLogWalRcvSendReply(requestReply, requestReply);
+ XLogWalRcvSendReply(requestReply, requestReply, false);
XLogWalRcvSendHSFeedback(false);
}
}
@@ -911,7 +915,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
/* If the primary requested a reply, send one immediately */
if (replyRequested)
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(true, false, false);
break;
}
default:
@@ -1074,7 +1078,7 @@ XLogWalRcvFlush(bool dying)
/* Also let the master know that we made some progress */
if (!dying)
{
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(false, false, false);
XLogWalRcvSendHSFeedback(false);
}
}
@@ -1092,15 +1096,18 @@ XLogWalRcvFlush(bool dying)
* If 'requestReply' is true, requests the server to reply immediately upon
* receiving this message. This is used for heartbearts, when approaching
* wal_receiver_timeout.
+ *
+ * If 'reportApplyTimestamp' is true, the latest apply timestamp is included.
*/
static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, bool reportApplyTimestamp)
{
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
+ TimestampTz applyTimestamp = 0;
/*
* If the user doesn't want status to be reported to the master, be sure
@@ -1132,7 +1139,35 @@ XLogWalRcvSendReply(bool force, bool requestReply)
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyTimestamp = GetXLogReplayTimestamp(&applyPtr);
+
+ /* Decide whether to send an apply timestamp for replay lag estimation. */
+ if (replay_lag_sample_interval != -1)
+ {
+ static TimestampTz lastTimestampSendTime = 0;
+
+ /*
+ * Only send an apply timestamp if we were explicitly asked to by the
+ * recovery process or if replay lag sampling is active but the
+ * recovery process seems to be stuck.
+ *
+ * If we haven't heard from the recovery process in a time exceeding
+ * wal_receiver_status_interval and yet it has not applied the highest
+ * LSN we've heard about, then we want to resend the last replayed
+ * timestamp we have; otherwise we zero it out and wait for the
+ * recovery process to wake us when it has set a new accurate replay
+ * timestamp. Note that we can read latestWalEnd without acquiring the
+ * mutex that protects it because it is only written to by this
+ * process (walreceiver).
+ */
+ if (reportApplyTimestamp ||
+ (WalRcv->latestWalEnd > applyPtr &&
+ TimestampDifferenceExceeds(lastTimestampSendTime, now,
+ wal_receiver_status_interval * 1000)))
+ lastTimestampSendTime = now;
+ else
+ applyTimestamp = 0;
+ }
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
@@ -1140,6 +1175,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp));
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */
@@ -1244,18 +1280,40 @@ static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
{
WalRcvData *walrcv = WalRcv;
-
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+ bool newHighWalEnd = false;
+
+ static TimestampTz lastRecordedTimestamp = 0;
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
if (walrcv->latestWalEnd < walEnd)
+ {
walrcv->latestWalEndTime = sendTime;
+ newHighWalEnd = true;
+ }
walrcv->latestWalEnd = walEnd;
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
+ /*
+ * If replay lag sampling is active, remember the upstream server's
+ * timestamp at the latest WAL end that it has, unless we've already
+ * done that too recently or the LSN hasn't advanced. This timestamp
+ * will be fed back to us by the startup process when it eventually
+ * replays this LSN, so that we can feed it back to the upstream server
+ * for replay lag tracking purposes.
+ */
+ if (replay_lag_sample_interval != -1 &&
+ newHighWalEnd &&
+ sendTime > TimestampTzPlusMilliseconds(lastRecordedTimestamp,
+ replay_lag_sample_interval))
+ {
+ SetXLogReplayTimestampAtLsn(sendTime, walEnd);
+ lastRecordedTimestamp = sendTime;
+ }
+
if (log_min_messages <= DEBUG2)
{
char *sendtime;
@@ -1291,12 +1349,14 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
* This is called by the startup process whenever interesting xlog records
* are applied, so that walreceiver can check if it needs to send an apply
* notification back to the master which may be waiting in a COMMIT with
- * synchronous_commit = remote_apply.
+ * synchronous_commit = remote_apply. Also used to send periodic messages
+ * which are used to compute pg_stat_replication.replay_lag.
*/
void
-WalRcvForceReply(void)
+WalRcvForceReply(bool apply_timestamp)
{
WalRcv->force_reply = true;
+ WalRcv->force_reply_apply_timestamp = apply_timestamp;
if (WalRcv->latch)
SetLatch(WalRcv->latch);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d80bcc0..0782d78 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1553,15 +1553,29 @@ ProcessStandbyReplyMessage(void)
XLogRecPtr writePtr,
flushPtr,
applyPtr;
+ int64 applyLagUs;
bool replyRequested;
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz applyTimestamp;
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
replyRequested = pq_getmsgbyte(&reply_message);
+ /* Compute the apply lag in milliseconds. */
+ if (applyTimestamp == 0)
+ applyLagUs = -1;
+ else
+#ifdef HAVE_INT64_TIMESTAMP
+ applyLagUs = now - applyTimestamp;
+#else
+ applyLagUs = (now - applyTimestamp) * 1000000;
+#endif
+
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
(uint32) (writePtr >> 32), (uint32) writePtr,
(uint32) (flushPtr >> 32), (uint32) flushPtr,
@@ -1583,6 +1597,8 @@ ProcessStandbyReplyMessage(void)
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
+ if (applyLagUs >= 0)
+ walsnd->applyLagUs = applyLagUs;
SpinLockRelease(&walsnd->mutex);
}
@@ -1979,6 +1995,7 @@ InitWalSenderSlot(void)
walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr;
+ walsnd->applyLagUs = -1;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
SpinLockRelease(&walsnd->mutex);
@@ -2761,7 +2778,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 8
+#define PG_STAT_GET_WAL_SENDERS_COLS 9
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2809,6 +2826,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int64 applyLagUs;
int priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2823,6 +2841,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
+ applyLagUs = walsnd->applyLagUs;
priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
@@ -2857,6 +2876,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[5] = true;
values[5] = LSNGetDatum(apply);
+ if (applyLagUs < 0)
+ nulls[6] = true;
+ else
+ {
+ Interval *applyLagInterval = palloc(sizeof(Interval));
+
+ applyLagInterval->month = 0;
+ applyLagInterval->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+ applyLagInterval->time = applyLagUs;
+#else
+ applyLagInterval->time = applyLagUs / 1000000.0;
+#endif
+ nulls[6] = false;
+ values[6] = IntervalPGetDatum(applyLagInterval);
+ }
+
/*
* Treat a standby such as a pg_basebackup background process
* which always returns an invalid flush location, as an
@@ -2864,18 +2900,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
- values[6] = Int32GetDatum(priority);
+ values[7] = Int32GetDatum(priority);
/*
* More easily understood version of standby state. This is purely
* informational, not different from priority.
*/
if (priority == 0)
- values[7] = CStringGetTextDatum("async");
+ values[8] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i))
- values[7] = CStringGetTextDatum("sync");
+ values[8] = CStringGetTextDatum("sync");
else
- values[7] = CStringGetTextDatum("potential");
+ values[8] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index c1d6f05..323d640 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1724,6 +1724,20 @@ GetSQLLocalTimestamp(int32 typmod)
}
/*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+ return timestamp * 1000000;
+}
+#endif
+
+/*
* TimestampDifference -- convert the difference between two timestamps
* into integer seconds and microseconds
*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a025117..b1af028 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1810,6 +1810,17 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"replay_lag_sample_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+ gettext_noop("Sets the minimum time between WAL timestamp samples used to estimate replay lag."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &replay_lag_sample_interval,
+ 1 * 1000, -1, INT_MAX / 1000,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
gettext_noop("Sets the maximum wait time to receive data from the primary."),
NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 7f9acfd..bf298f2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -270,6 +270,8 @@
# in milliseconds; 0 disables
#wal_retrieve_retry_interval = 5s # time to wait before retrying to
# retrieve WAL after a failed attempt
+#replay_lag_sample_interval = 1s # min time between timestamps recorded
+ # to estimate replay lag; -1 disables replay lag sampling
#------------------------------------------------------------------------------
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..9753648 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -111,7 +111,7 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
- char replybuf[1 + 8 + 8 + 8 + 8 + 1];
+ char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 1];
int len = 0;
/*
@@ -142,6 +142,8 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
len += 8;
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
+ fe_sendint64(0, &replybuf[len]); /* applyTimestamp */
+ len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 4382e5d..8e89627 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -321,7 +321,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
static bool
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
{
- char replybuf[1 + 8 + 8 + 8 + 8 + 1];
+ char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 1];
int len = 0;
replybuf[len] = 'r';
@@ -337,6 +337,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
len += 8;
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
+ fe_sendint64(0, &replybuf[len]); /* applyTimestamp */
+ len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c9f332c..1be2f34 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -237,6 +237,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
+extern void SetXLogReplayTimestamp(TimestampTz timestamp);
+extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn);
+extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn);
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index cd7b909..b565dd8 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2768,7 +2768,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f
DESCR("statistics: information about currently active backends");
DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25}" "{o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver");
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 28dc1fc..be25758 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -23,6 +23,7 @@
extern int wal_receiver_status_interval;
extern int wal_receiver_timeout;
extern bool hot_standby_feedback;
+extern int replay_lag_sample_interval;
/*
* MAXCONNINFO: maximum size of a connection string.
@@ -119,6 +120,9 @@ typedef struct
*/
bool force_reply;
+ /* include the latest replayed timestamp when replying? */
+ bool force_reply_apply_timestamp;
+
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
@@ -208,6 +212,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
-extern void WalRcvForceReply(void);
+extern void WalRcvForceReply(bool sendApplyTimestamp);
#endif /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7794aa5..4de43e8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -46,6 +46,7 @@ typedef struct WalSnd
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int64 applyLagUs;
/* Protects shared variables shown above. */
slock_t mutex;
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 93b90fe..20517c9 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -233,9 +233,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
#ifndef HAVE_INT64_TIMESTAMP
extern int64 GetCurrentIntegerTimestamp(void);
extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
#else
#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
#endif
extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5314b9c..d59956f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1809,10 +1809,11 @@ pg_stat_replication| SELECT s.pid,
w.write_location,
w.flush_location,
w.replay_location,
+ w.replay_lag,
w.sync_priority,
w.sync_state
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
- JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid)))
+ JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
s.ssl,