diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8d7b3bf..b894e31 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3310,6 +3310,26 @@ ANY num_sync (
+ replication_lag_sample_interval (integer)
+
+ replication_lag_sample_interval> configuration parameter
+
+
+
+
+ Controls how often a standby should sample timestamps from upstream to
+ send back to the primary or upstream standby after writing, flushing
+ and replaying WAL. The default is 1 second. Units are milliseconds if
+ not specified. A value of -1 disables the reporting of replication
+ lag. Estimated lag can be seen in the
+ pg_stat_replication> view of the upstream server.
+ This parameter can only be set
+ in the postgresql.conf> file or on the server command line.
+
+
+
+
hot_standby_feedback (boolean)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 1545f03..a422ac0 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1405,6 +1405,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
standby server
+ write_lag>
+ interval>
+ Estimated time taken for recent WAL records to be written on this
+ standby server
+
+
+ flush_lag>
+ interval>
+ Estimated time taken for recent WAL records to be flushed on this
+ standby server
+
+
+ replay_lag>
+ interval>
+ Estimated time taken for recent WAL records to be replayed on this
+ standby server
+
+ sync_priority>integer>Priority of this standby server for being chosen as the
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f8ffa5c..7e7312f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -82,6 +82,8 @@ extern uint32 bootstrap_data_checksum_version;
#define PROMOTE_SIGNAL_FILE "promote"
#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
+/* Size of the circular buffer of timestamped LSNs. */
+#define XLOG_TIMESTAMP_BUFFER_SIZE 8192
/* User-settable parameters */
int max_wal_size = 64; /* 1 GB */
@@ -530,6 +532,26 @@ typedef struct XLogCtlInsert
} XLogCtlInsert;
/*
+ * A sample associating a timestamp with a given xlog position.
+ */
+typedef struct XLogTimestamp
+{
+ TimestampTz timestamp;
+ XLogRecPtr lsn;
+} XLogTimestamp;
+
+/*
+ * A circular buffer of LSNs and associated timestamps. The buffer is empty
+ * when read_head == write_head.
+ */
+typedef struct XLogTimestampBuffer
+{
+ uint32 read_head;
+ uint32 write_head;
+ XLogTimestamp buffer[XLOG_TIMESTAMP_BUFFER_SIZE];
+} XLogTimestampBuffer;
+
+/*
* Total shared-memory state for XLOG.
*/
typedef struct XLogCtlData
@@ -648,6 +670,14 @@ typedef struct XLogCtlData
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* timestamp from the most recently applied record associated with a timestamp. */
+ TimestampTz lastReplayedTimestamp;
+
+ /* buffers of timestamps for WAL that is not yet written/flushed/applied. */
+ XLogTimestampBuffer writeTimestamps;
+ XLogTimestampBuffer flushTimestamps;
+ XLogTimestampBuffer applyTimestamps;
+
/*
* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery
@@ -6006,6 +6036,96 @@ CheckRequiredParameterValues(void)
}
/*
+ * Read and consume all records from 'buffer' whose position is <= 'lsn'.
+ * Return true if any such records are found, and write the latest timestamp
+ * found into *timestamp. Write the new read head position into *read_head,
+ * so that the caller can store it with appropriate locking.
+ */
+static bool
+ReadXLogTimestampForLsn(XLogTimestampBuffer *buffer,
+ XLogRecPtr lsn,
+ uint32 *read_head,
+ TimestampTz *timestamp)
+{
+ bool found = false;
+
+ /*
+ * It's OK to access buffer->read_head without any kind synchronization
+ * because in all cases the caller is the only process reading from the
+ * buffer (ie writing to *buffer->read_head).
+ */
+ *read_head = buffer->read_head;
+
+ /*
+ * It's OK to access write_head without interlocking because it's an
+ * aligned 32 bit value which we can read atomically on all supported
+ * platforms to get some recent value, not a torn/garbage value.
+ * Furthermore we must see a value that is at least as recent as any WAL
+ * that we have written/flushed/replayed, because walreceiver calls
+ * SetXLogTimestampAtLsn before writing.
+ */
+ while (*read_head != buffer->write_head &&
+ buffer->buffer[*read_head].lsn <= lsn)
+ {
+ found = true;
+ *timestamp = buffer->buffer[*read_head].timestamp;
+ *read_head = (*read_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+ }
+
+ return found;
+}
+
+/*
+ * Called by the WAL receiver process after it has written up to 'lsn'.
+ * Return true if it has written any LSN location that had an associated
+ * timestamp, and write the timestamp to '*timestamp'.
+ */
+bool
+CheckForWrittenTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+ Assert(AmWalReceiverProcess());
+
+ return ReadXLogTimestampForLsn(&XLogCtl->writeTimestamps, lsn,
+ &XLogCtl->writeTimestamps.read_head,
+ timestamp);
+}
+
+/*
+ * Called by the WAL receiver process after it has flushed up to 'lsn'.
+ * Return true if it has flushed any LSN location that had an associated
+ * timestamp, and write the timestamp to '*timestamp'.
+ */
+bool
+CheckForFlushedTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+ Assert(AmWalReceiverProcess());
+
+ return ReadXLogTimestampForLsn(&XLogCtl->flushTimestamps, lsn,
+ &XLogCtl->flushTimestamps.read_head,
+ timestamp);
+}
+
+/*
+ * Called by the startup process after it has replayed up to 'lsn'. Checks
+ * for timestamps associated with WAL positions that have now been replayed.
+ * If any are found, the latest such timestamp found is written to
+ * '*timestamp'. Returns the new buffer read head position, which the caller
+ * should write into XLogCtl->timestamps.read_head while holding info_lck.
+ */
+static uint32
+CheckForAppliedTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+ uint32 read_head;
+
+ Assert(AmStartupProcess());
+
+ ReadXLogTimestampForLsn(&XLogCtl->applyTimestamps, lsn, &read_head,
+ timestamp);
+
+ return read_head;
+}
+
+/*
* This must be called ONCE during postmaster or standalone-backend startup
*/
void
@@ -6824,6 +6944,8 @@ StartupXLOG(void)
do
{
bool switchedTLI = false;
+ TimestampTz replayed_timestamp = 0;
+ uint32 timestamp_read_head;
#ifdef WAL_DEBUG
if (XLOG_DEBUG ||
@@ -6977,24 +7099,35 @@ StartupXLOG(void)
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+ /* Check if we have replayed a timestamped WAL position */
+ timestamp_read_head =
+ CheckForAppliedTimestampedLsn(EndRecPtr,
+ &replayed_timestamp);
+
/*
- * Update lastReplayedEndRecPtr after this record has been
- * successfully replayed.
+ * Update lastReplayedEndRecPtr and lastReplayedTimestamp
+ * after this record has been successfully replayed.
*/
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
XLogCtl->lastReplayedTLI = ThisTimeLineID;
+ XLogCtl->applyTimestamps.read_head = timestamp_read_head;
+ if (replayed_timestamp != 0)
+ XLogCtl->lastReplayedTimestamp = replayed_timestamp;
SpinLockRelease(&XLogCtl->info_lck);
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake
* up the receiver so that it notices the updated
- * lastReplayedEndRecPtr and sends a reply to the master.
+ * lastReplayedEndRecPtr and sends a reply to the master. We
+ * also wake it if we have replayed a WAL position that has
+ * an associated timestamp so that the upstream server can
+ * measure our replay lag.
*/
- if (doRequestWalReceiverReply)
+ if (doRequestWalReceiverReply || replayed_timestamp != 0)
{
doRequestWalReceiverReply = false;
- WalRcvForceReply();
+ WalRcvForceReply(replayed_timestamp != 0);
}
/* Remember this record as the last-applied one */
@@ -11809,3 +11942,106 @@ XLogRequestWalReceiverReply(void)
{
doRequestWalReceiverReply = true;
}
+
+/*
+ * Store an (lsn, timestamp) sample in a timestamp buffer.
+ */
+static void
+StoreXLogTimestampAtLsn(XLogTimestampBuffer *buffer,
+ TimestampTz timestamp, XLogRecPtr lsn)
+{
+
+ uint32 write_head = buffer->write_head;
+ uint32 new_write_head = (write_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+
+ Assert(AmWalReceiverProcess());
+
+ if (new_write_head == buffer->read_head)
+ {
+ /*
+ * The buffer is full, so we'll rewind and overwrite the most
+ * recent sample. Overwriting the most recent sample means that
+ * if we're not writing/flushing/replaying fast enough and the buffer
+ * fills up, we'll effectively lower the sampling rate.
+ */
+ new_write_head = write_head;
+ write_head = (write_head - 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+ }
+
+ buffer->buffer[write_head].lsn = lsn;
+ buffer->buffer[write_head].timestamp = timestamp;
+ buffer->write_head = new_write_head;
+}
+
+/*
+ * Record the timestamp that is associated with a WAL position.
+ *
+ * This is called by walreceiver on standby servers when new messages arrive,
+ * using a timestamp and the latest known WAL position from the upstream
+ * server. The timestamp will be sent back to the upstream server via
+ * walreceiver when the WAL position is eventually written, flushed and
+ * applied.
+ */
+void
+SetXLogTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn)
+{
+ bool applied_end = false;
+ static TimestampTz last_timestamp;
+ static XLogRecPtr last_lsn;
+
+ Assert(AmWalReceiverProcess());
+ Assert(replication_lag_sample_interval >= 0);
+
+ SpinLockAcquire(&XLogCtl->info_lck);
+
+ /*
+ * Check if we're fully applied, so we can avoid recording samples in that
+ * case. There is effectively no replay lag, and we don't want to report
+ * bogus lag after a period of idleness.
+ */
+ if (lsn == XLogCtl->lastReplayedEndRecPtr)
+ applied_end = true;
+
+ /*
+ * Record this timestamp/LSN pair, if the LSN has moved since last time
+ * and we haven't recorded a sample too recently.
+ */
+ if (!applied_end &&
+ lsn > last_lsn &&
+ timestamp > TimestampTzPlusMilliseconds(last_timestamp,
+ replication_lag_sample_interval))
+ {
+ StoreXLogTimestampAtLsn(&XLogCtl->applyTimestamps, timestamp, lsn);
+ StoreXLogTimestampAtLsn(&XLogCtl->writeTimestamps, timestamp, lsn);
+ StoreXLogTimestampAtLsn(&XLogCtl->flushTimestamps, timestamp, lsn);
+
+ last_timestamp = timestamp;
+ last_lsn = lsn;
+ }
+
+ SpinLockRelease(&XLogCtl->info_lck);
+}
+
+/*
+ * Get the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the upstream server, and also the most recently applied LSN.
+ * (Note that the timestamp and the LSN don't necessarily relate to the same
+ * record.)
+ *
+ * This is similar to GetLatestXTime, except that it is advanced when WAL
+ * positions recorded with SetXLogReplayTimestampAtLsn have been applied,
+ * rather than commit records.
+ */
+TimestampTz
+GetXLogReplayTimestamp(XLogRecPtr *lsn)
+{
+ TimestampTz result;
+
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (lsn)
+ *lsn = XLogCtl->lastReplayedEndRecPtr;
+ result = XLogCtl->lastReplayedTimestamp;
+ SpinLockRelease(&XLogCtl->info_lck);
+
+ return result;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 649cef8..2fd63e3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -685,6 +685,9 @@ CREATE VIEW pg_stat_replication AS
W.write_location,
W.flush_location,
W.replay_location,
+ W.write_lag,
+ W.flush_lag,
+ W.replay_lag,
W.sync_priority,
W.sync_state
FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index cc3cf7d..621aa24 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -73,6 +73,7 @@
int wal_receiver_status_interval;
int wal_receiver_timeout;
bool hot_standby_feedback;
+int replication_lag_sample_interval;
/* libpqwalreceiver connection */
static WalReceiverConn *wrconn = NULL;
@@ -107,6 +108,10 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+/* Latest timestamps for replication lag tracking. */
+static TimestampTz last_write_timestamp;
+static TimestampTz last_flush_timestamp;
+
static StringInfoData reply_message;
static StringInfoData incoming_message;
@@ -138,7 +143,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, int timestamps);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -148,6 +153,16 @@ static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
+/*
+ * Which timestamps to include in a reply message.
+ */
+typedef enum XLogReplyTimestamp
+{
+ REPLY_WRITE_TIMESTAMP = 1,
+ REPLY_FLUSH_TIMESTAMP = 2,
+ REPLY_APPLY_TIMESTAMP = 4
+} XLogReplyTimestamp;
+
static void
ProcessWalRcvInterrupts(void)
@@ -424,6 +439,8 @@ WalReceiverMain(void)
len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
+ int timestamp = 0;
+
/*
* Process the received data, and any subsequent data we
* can read without blocking.
@@ -455,8 +472,17 @@ WalReceiverMain(void)
len = walrcv_receive(wrconn, &buf, &wait_fd);
}
+ /*
+ * Check if we have written an LSN location for which we
+ * have a timestamp from the upstream server, for
+ * replication lag tracking.
+ */
+ if (CheckForWrittenTimestampedLsn(LogstreamResult.Write,
+ &last_write_timestamp))
+ timestamp = REPLY_WRITE_TIMESTAMP;
+
/* Let the master know that we received some data. */
- XLogWalRcvSendReply(false, false);
+ XLogWalRcvSendReply(false, false, timestamp);
/*
* If we've written some records, flush them to disk and
@@ -493,15 +519,20 @@ WalReceiverMain(void)
ResetLatch(walrcv->latch);
if (walrcv->force_reply)
{
+ int timestamps = 0;
+
/*
* The recovery process has asked us to send apply
* feedback now. Make sure the flag is really set to
* false in shared memory before sending the reply, so
* we don't miss a new request for a reply.
*/
+ if (walrcv->force_reply_apply_timestamp)
+ timestamps = REPLY_APPLY_TIMESTAMP;
walrcv->force_reply = false;
+ walrcv->force_reply_apply_timestamp = false;
pg_memory_barrier();
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(true, false, timestamps);
}
}
if (rc & WL_POSTMASTER_DEATH)
@@ -559,7 +590,7 @@ WalReceiverMain(void)
}
}
- XLogWalRcvSendReply(requestReply, requestReply);
+ XLogWalRcvSendReply(requestReply, requestReply, 0);
XLogWalRcvSendHSFeedback(false);
}
}
@@ -911,7 +942,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
/* If the primary requested a reply, send one immediately */
if (replyRequested)
- XLogWalRcvSendReply(true, false);
+ XLogWalRcvSendReply(true, false, 0);
break;
}
default:
@@ -1074,7 +1105,18 @@ XLogWalRcvFlush(bool dying)
/* Also let the master know that we made some progress */
if (!dying)
{
- XLogWalRcvSendReply(false, false);
+ /*
+ * Check if we have just flushed a position for which we have a
+ * timestamp from the upstream server, for replication lag
+ * tracking.
+ */
+ int timestamp = 0;
+
+ if (CheckForFlushedTimestampedLsn(LogstreamResult.Flush,
+ &last_flush_timestamp))
+ timestamp = REPLY_FLUSH_TIMESTAMP;
+
+ XLogWalRcvSendReply(false, false, timestamp);
XLogWalRcvSendHSFeedback(false);
}
}
@@ -1092,21 +1134,27 @@ XLogWalRcvFlush(bool dying)
* If 'requestReply' is true, requests the server to reply immediately upon
* receiving this message. This is used for heartbearts, when approaching
* wal_receiver_timeout.
+ *
+ * The bitmap 'timestamps' specifies which timestamps should be included, for
+ * replication lag tracking purposes.
*/
static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, int timestamps)
{
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
+ TimestampTz writeTimestamp = 0;
+ TimestampTz flushTimestamp = 0;
+ TimestampTz applyTimestamp = 0;
/*
* If the user doesn't want status to be reported to the master, be sure
* to exit before doing anything at all.
*/
- if (!force && wal_receiver_status_interval <= 0)
+ if (!force && timestamps == 0 && wal_receiver_status_interval <= 0)
return;
/* Get current timestamp. */
@@ -1132,7 +1180,41 @@ XLogWalRcvSendReply(bool force, bool requestReply)
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyTimestamp = GetXLogReplayTimestamp(&applyPtr);
+ flushTimestamp = last_flush_timestamp;
+ writeTimestamp = last_write_timestamp;
+
+ /* Decide whether to send timestamps for replay lag estimation. */
+ if (replication_lag_sample_interval != -1)
+ {
+ static TimestampTz lastApplyTimestampSendTime = 0;
+
+ /*
+ * Only send an apply timestamp if we were explicitly asked to by the
+ * recovery process or if replay lag sampling is active but the
+ * recovery process seems to be stuck.
+ *
+ * If we haven't heard from the recovery process in a time exceeding
+ * wal_receiver_status_interval and yet it has not applied the highest
+ * LSN we've heard about, then we want to resend the last replayed
+ * timestamp we have; otherwise we zero it out and wait for the
+ * recovery process to wake us when it has set a new accurate replay
+ * timestamp. Note that we can read latestWalEnd without acquiring the
+ * mutex that protects it because it is only written to by this
+ * process (walreceiver).
+ */
+ if (((timestamps & REPLY_APPLY_TIMESTAMP) != 0) ||
+ (WalRcv->latestWalEnd > applyPtr &&
+ TimestampDifferenceExceeds(lastApplyTimestampSendTime, now,
+ wal_receiver_status_interval * 1000)))
+ lastApplyTimestampSendTime = now;
+ else
+ applyTimestamp = 0;
+ if ((timestamps & REPLY_FLUSH_TIMESTAMP) == 0)
+ flushTimestamp = 0;
+ if ((timestamps & REPLY_WRITE_TIMESTAMP) == 0)
+ writeTimestamp = 0;
+ }
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
@@ -1140,6 +1222,9 @@ XLogWalRcvSendReply(bool force, bool requestReply)
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(writeTimestamp));
+ pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(flushTimestamp));
+ pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp));
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */
@@ -1244,7 +1329,6 @@ static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
{
WalRcvData *walrcv = WalRcv;
-
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
@@ -1256,6 +1340,16 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
+ /*
+ * If replication lag sampling is active, remember the upstream server's
+ * timestamp at the latest WAL end that it has. We'll be able to retrieve
+ * this timestamp once we have written, flushed and finally applied this
+ * LSN, so that we can report it to the upstream server for lag tracking
+ * purposes.
+ */
+ if (replication_lag_sample_interval != -1)
+ SetXLogTimestampAtLsn(sendTime, walEnd);
+
if (log_min_messages <= DEBUG2)
{
char *sendtime;
@@ -1291,12 +1385,14 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
* This is called by the startup process whenever interesting xlog records
* are applied, so that walreceiver can check if it needs to send an apply
* notification back to the master which may be waiting in a COMMIT with
- * synchronous_commit = remote_apply.
+ * synchronous_commit = remote_apply. Also used to send periodic messages
+ * which are used to compute pg_stat_replication.replay_lag.
*/
void
-WalRcvForceReply(void)
+WalRcvForceReply(bool apply_timestamp)
{
WalRcv->force_reply = true;
+ WalRcv->force_reply_apply_timestamp = apply_timestamp;
if (WalRcv->latch)
SetLatch(WalRcv->latch);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5cdb8a0..3fbca0c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1545,6 +1545,25 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
}
/*
+ * Compute the difference between 'timestamp' and 'now' in microseconds.
+ * Return -1 if timestamp is zero.
+ */
+static uint64
+compute_lag(TimestampTz now, TimestampTz timestamp)
+{
+ if (timestamp == 0)
+ return -1;
+ else
+ {
+#ifdef HAVE_INT64_TIMESTAMP
+ return now - timestamp;
+#else
+ return (now - timestamp) * 1000000;
+#endif
+ }
+}
+
+/*
* Regular reply from standby advising of WAL positions on standby server.
*/
static void
@@ -1553,15 +1572,30 @@ ProcessStandbyReplyMessage(void)
XLogRecPtr writePtr,
flushPtr,
applyPtr;
+ int64 writeLagUs,
+ flushLagUs,
+ applyLagUs;
+ TimestampTz writeTimestamp,
+ flushTimestamp,
+ applyTimestamp;
bool replyRequested;
+ TimestampTz now = GetCurrentTimestamp();
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ writeTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
+ flushTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
+ applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
replyRequested = pq_getmsgbyte(&reply_message);
+ /* Compute the replication lag. */
+ writeLagUs = compute_lag(now, writeTimestamp);
+ flushLagUs = compute_lag(now, flushTimestamp);
+ applyLagUs = compute_lag(now, applyTimestamp);
+
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
(uint32) (writePtr >> 32), (uint32) writePtr,
(uint32) (flushPtr >> 32), (uint32) flushPtr,
@@ -1583,6 +1617,12 @@ ProcessStandbyReplyMessage(void)
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
+ if (writeLagUs >= 0)
+ walsnd->writeLagUs = writeLagUs;
+ if (flushLagUs >= 0)
+ walsnd->flushLagUs = flushLagUs;
+ if (applyLagUs >= 0)
+ walsnd->applyLagUs = applyLagUs;
SpinLockRelease(&walsnd->mutex);
}
@@ -1979,6 +2019,9 @@ InitWalSenderSlot(void)
walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr;
+ walsnd->writeLagUs = -1;
+ walsnd->flushLagUs = -1;
+ walsnd->applyLagUs = -1;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
SpinLockRelease(&walsnd->mutex);
@@ -2753,6 +2796,21 @@ WalSndGetStateString(WalSndState state)
return "UNKNOWN";
}
+static Interval *
+lag_as_interval(uint64 lag_us)
+{
+ Interval *result = palloc(sizeof(Interval));
+
+ result->month = 0;
+ result->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+ result->time = lag_us;
+#else
+ result->time = lag_us / 1000000.0;
+#endif
+
+ return result;
+}
/*
* Returns activity of walsenders, including pids and xlog locations sent to
@@ -2761,7 +2819,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 8
+#define PG_STAT_GET_WAL_SENDERS_COLS 11
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2809,6 +2867,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int64 writeLagUs;
+ int64 flushLagUs;
+ int64 applyLagUs;
int priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2823,6 +2884,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
+ writeLagUs = walsnd->writeLagUs;
+ flushLagUs = walsnd->flushLagUs;
+ applyLagUs = walsnd->applyLagUs;
priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
@@ -2857,6 +2921,21 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[5] = true;
values[5] = LSNGetDatum(apply);
+ if (writeLagUs < 0)
+ nulls[6] = true;
+ else
+ values[6] = IntervalPGetDatum(lag_as_interval(writeLagUs));
+
+ if (flushLagUs < 0)
+ nulls[7] = true;
+ else
+ values[7] = IntervalPGetDatum(lag_as_interval(flushLagUs));
+
+ if (applyLagUs < 0)
+ nulls[8] = true;
+ else
+ values[8] = IntervalPGetDatum(lag_as_interval(applyLagUs));
+
/*
* Treat a standby such as a pg_basebackup background process
* which always returns an invalid flush location, as an
@@ -2864,7 +2943,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
- values[6] = Int32GetDatum(priority);
+ values[9] = Int32GetDatum(priority);
/*
* More easily understood version of standby state. This is purely
@@ -2878,12 +2957,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* states. We report just "quorum" for them.
*/
if (priority == 0)
- values[7] = CStringGetTextDatum("async");
+ values[10] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i))
- values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+ values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
- values[7] = CStringGetTextDatum("potential");
+ values[10] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 545e9e0..90c608d 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1777,6 +1777,20 @@ GetSQLLocalTimestamp(int32 typmod)
}
/*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+ return timestamp * 1000000;
+}
+#endif
+
+/*
* TimestampDifference -- convert the difference between two timestamps
* into integer seconds and microseconds
*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 946ba9e..1adb598 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1800,6 +1800,17 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"replication_lag_sample_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+ gettext_noop("Sets the minimum time between WAL timestamp samples used to estimate replication lag."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &replication_lag_sample_interval,
+ 1 * 1000, -1, INT_MAX / 1000,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
gettext_noop("Sets the maximum wait time to receive data from the primary."),
NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ee8232f..f703e25 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -271,6 +271,8 @@
# in milliseconds; 0 disables
#wal_retrieve_retry_interval = 5s # time to wait before retrying to
# retrieve WAL after a failed attempt
+#replication_lag_sample_interval = 1s # min time between timestamps recorded
+ # to estimate lag; -1 disables lag sampling
#------------------------------------------------------------------------------
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..6feb95d 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -111,7 +111,7 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
- char replybuf[1 + 8 + 8 + 8 + 8 + 1];
+ char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 1];
int len = 0;
/*
@@ -142,6 +142,12 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
len += 8;
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
+ fe_sendint64(0, &replybuf[len]); /* writeTimestamp */
+ len += 8;
+ fe_sendint64(0, &replybuf[len]); /* flushTimestamp */
+ len += 8;
+ fe_sendint64(0, &replybuf[len]); /* applyTimestamp */
+ len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 568ff17..960e02f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -321,7 +321,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
static bool
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
{
- char replybuf[1 + 8 + 8 + 8 + 8 + 1];
+ char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 1];
int len = 0;
replybuf[len] = 'r';
@@ -337,6 +337,12 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
len += 8;
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
+ fe_sendint64(0, &replybuf[len]); /* writeTimestamp */
+ len += 8;
+ fe_sendint64(0, &replybuf[len]); /* flushTimestamp */
+ len += 8;
+ fe_sendint64(0, &replybuf[len]); /* applyTimestamp */
+ len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7d21408..ee11cf5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -246,6 +246,12 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
+extern void SetXLogTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn);
+extern bool CheckForWrittenTimestampedLsn(XLogRecPtr lsn,
+ TimestampTz *timestamp);
+extern bool CheckForFlushedTimestampedLsn(XLogRecPtr lsn,
+ TimestampTz *timestamp);
+extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn);
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index a6cc2eb..80267b4 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2768,7 +2768,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f
DESCR("statistics: information about currently active backends");
DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver");
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 28dc1fc..41b248f 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -23,6 +23,7 @@
extern int wal_receiver_status_interval;
extern int wal_receiver_timeout;
extern bool hot_standby_feedback;
+extern int replication_lag_sample_interval;
/*
* MAXCONNINFO: maximum size of a connection string.
@@ -119,6 +120,9 @@ typedef struct
*/
bool force_reply;
+ /* include the latest replayed timestamp when replying? */
+ bool force_reply_apply_timestamp;
+
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
@@ -208,6 +212,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
-extern void WalRcvForceReply(void);
+extern void WalRcvForceReply(bool sendApplyTimestamp);
#endif /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7794aa5..fb3a03f 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -46,6 +46,9 @@ typedef struct WalSnd
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int64 writeLagUs;
+ int64 flushLagUs;
+ int64 applyLagUs;
/* Protects shared variables shown above. */
slock_t mutex;
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 93b90fe..20517c9 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -233,9 +233,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
#ifndef HAVE_INT64_TIMESTAMP
extern int64 GetCurrentIntegerTimestamp(void);
extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
#else
#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
#endif
extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e9cfadb..14147c5 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1809,10 +1809,13 @@ pg_stat_replication| SELECT s.pid,
w.write_location,
w.flush_location,
w.replay_location,
+ w.write_lag,
+ w.flush_lag,
+ w.replay_lag,
w.sync_priority,
w.sync_state
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
- JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid)))
+ JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
s.ssl,