diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 3b614b6..36e00e0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3282,6 +3282,26 @@ include_dir 'conf.d' + + replay_lag_sample_interval (integer) + + replay_lag_sample_interval configuration parameter + + + + + Controls how often a standby should sample replay lag information to + send back to the primary or upstream standby while replaying WAL. The + default is 1 second. Units are milliseconds if not specified. A + value of -1 disables the reporting of replay lag. Estimated replay lag + can be seen in the + pg_stat_replication view of the upstream server. + This parameter can only be set + in the postgresql.conf file or on the server command line. + + + + hot_standby_feedback (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5b58d2e..79d2d95 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1401,6 +1401,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i standby server + replay_lag + interval + Estimated time taken for recent WAL records to be replayed on this + standby server + + sync_priority integer Priority of this standby server for being chosen as the diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index aa9ee5a..739e45e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -82,6 +82,8 @@ extern uint32 bootstrap_data_checksum_version; #define PROMOTE_SIGNAL_FILE "promote" #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote" +/* Size of the circular buffer of timestamped LSNs. */ +#define XLOG_TIMESTAMP_BUFFER_SIZE 8192 /* User-settable parameters */ int max_wal_size = 64; /* 1 GB */ @@ -520,6 +522,26 @@ typedef struct XLogCtlInsert } XLogCtlInsert; /* + * A sample associating a timestamp with a given xlog position. + */ +typedef struct XLogTimestamp +{ + TimestampTz timestamp; + XLogRecPtr lsn; +} XLogTimestamp; + +/* + * A circular buffer of LSNs and associated timestamps. The buffer is empty + * when read_head == write_head. + */ +typedef struct XLogTimestampBuffer +{ + uint32 read_head; + uint32 write_head; + XLogTimestamp buffer[XLOG_TIMESTAMP_BUFFER_SIZE]; +} XLogTimestampBuffer; + +/* * Total shared-memory state for XLOG. */ typedef struct XLogCtlData @@ -637,6 +659,12 @@ typedef struct XLogCtlData /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* timestamp from the most recently applied record associated with a timestamp. */ + TimestampTz lastReplayedTimestamp; + + /* a buffer of upstream timestamps for WAL that is not yet applied. */ + XLogTimestampBuffer timestamps; + /* * timestamp of when we started replaying the current chunk of WAL data, * only relevant for replication or archive recovery @@ -5976,6 +6004,44 @@ CheckRequiredParameterValues(void) } /* + * Called by the startup process after it has replayed up to 'lsn'. Checks + * for timestamps associated with WAL positions that have now been replayed. + * If any are found, the latest such timestamp found is written to + * '*timestamp'. Returns the new buffer read head position, which the caller + * should write into XLogCtl->timestamps.read_head while holding info_lck. + */ +static uint32 +CheckForReplayedTimestamps(XLogRecPtr lsn, TimestampTz *timestamp) +{ + uint32 read_head; + + /* + * It's OK to access timestamps.read_head without any kind synchronization + * because this process is the only one to write to it. + */ + Assert(AmStartupProcess()); + read_head = XLogCtl->timestamps.read_head; + + /* + * It's OK to access write_head without interlocking because it's an + * aligned 32 bit value which we can read atomically on all supported + * platforms to get some recent value, not a torn/garbage value. + * Furthermore we must see a value that is at least as recent as any WAL + * that we have replayed, because walreceiver calls + * SetXLogReplayTimestampAtLsn before passing the corresponding WAL data + * to the recovery process. + */ + while (read_head != XLogCtl->timestamps.write_head && + XLogCtl->timestamps.buffer[read_head].lsn <= lsn) + { + *timestamp = XLogCtl->timestamps.buffer[read_head].timestamp; + read_head = (read_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE; + } + + return read_head; +} + +/* * This must be called ONCE during postmaster or standalone-backend startup */ void @@ -6794,6 +6860,8 @@ StartupXLOG(void) do { bool switchedTLI = false; + TimestampTz replayed_timestamp = 0; + uint32 timestamp_read_head; #ifdef WAL_DEBUG if (XLOG_DEBUG || @@ -6947,24 +7015,34 @@ StartupXLOG(void) /* Pop the error context stack */ error_context_stack = errcallback.previous; + /* Check if we have replayed a timestamped WAL position */ + timestamp_read_head = + CheckForReplayedTimestamps(EndRecPtr, &replayed_timestamp); + /* - * Update lastReplayedEndRecPtr after this record has been - * successfully replayed. + * Update lastReplayedEndRecPtr and lastReplayedTimestamp + * after this record has been successfully replayed. */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->lastReplayedEndRecPtr = EndRecPtr; XLogCtl->lastReplayedTLI = ThisTimeLineID; + XLogCtl->timestamps.read_head = timestamp_read_head; + if (replayed_timestamp != 0) + XLogCtl->lastReplayedTimestamp = replayed_timestamp; SpinLockRelease(&XLogCtl->info_lck); /* * If rm_redo called XLogRequestWalReceiverReply, then we wake * up the receiver so that it notices the updated - * lastReplayedEndRecPtr and sends a reply to the master. + * lastReplayedEndRecPtr and sends a reply to the master. We + * also wake it if we have replayed a WAL position that has + * an associated timestamp so that the upstream server can + * measure our replay lag. */ - if (doRequestWalReceiverReply) + if (doRequestWalReceiverReply || replayed_timestamp != 0) { doRequestWalReceiverReply = false; - WalRcvForceReply(); + WalRcvForceReply(replayed_timestamp != 0); } /* Remember this record as the last-applied one */ @@ -11745,3 +11823,81 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Record the timestamp that is associated with a WAL position. + * + * This is called by walreceiver on standby servers when new messages arrive, + * using a timestamp and the latest known WAL position from the upstream + * server. The timestamp will be sent back to the upstream server via + * walreceiver when the recovery process has applied the WAL position. The + * upstream server can then compute the elapsed time to estimate the replay + * lag. + */ +void +SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn) +{ + Assert(AmWalReceiverProcess()); + + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn == XLogCtl->lastReplayedEndRecPtr) + { + /* + * That is the last replayed LSN: we are fully replayed, so we can + * update the replay timestamp immediately. + */ + XLogCtl->lastReplayedTimestamp = timestamp; + } + else + { + /* + * There is WAL still to be applied. We will associate the timestamp + * with this WAL position and wait for it to be replayed. We add it + * at the 'write' end of the circular buffer of LSN/timestamp + * mappings, which the replay loop will eventually read. + */ + uint32 write_head = XLogCtl->timestamps.write_head; + uint32 new_write_head = (write_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE; + + if (new_write_head == XLogCtl->timestamps.read_head) + { + /* + * The buffer is full, so we'll rewind and overwrite the most + * recent sample. Overwriting the most recent sample means that + * if we're not replaying fast enough and the buffer fills up, + * we'll effectively lower the sampling rate. + */ + new_write_head = write_head; + write_head = (write_head - 1) % XLOG_TIMESTAMP_BUFFER_SIZE; + } + + XLogCtl->timestamps.buffer[write_head].lsn = lsn; + XLogCtl->timestamps.buffer[write_head].timestamp = timestamp; + XLogCtl->timestamps.write_head = new_write_head; + } + SpinLockRelease(&XLogCtl->info_lck); +} + +/* + * Get the timestamp for the most recently applied WAL record that carried a + * timestamp from the upstream server, and also the most recently applied LSN. + * (Note that the timestamp and the LSN don't necessarily relate to the same + * record.) + * + * This is similar to GetLatestXTime, except that it is advanced when WAL + * positions recorded with SetXLogReplayTimestampAtLsn have been applied, + * rather than commit records. + */ +TimestampTz +GetXLogReplayTimestamp(XLogRecPtr *lsn) +{ + TimestampTz result; + + SpinLockAcquire(&XLogCtl->info_lck); + if (lsn) + *lsn = XLogCtl->lastReplayedEndRecPtr; + result = XLogCtl->lastReplayedTimestamp; + SpinLockRelease(&XLogCtl->info_lck); + + return result; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 48e7c4b..e0e45fa 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -685,6 +685,7 @@ CREATE VIEW pg_stat_replication AS W.write_location, W.flush_location, W.replay_location, + W.replay_lag, W.sync_priority, W.sync_state FROM pg_stat_get_activity(NULL) AS S diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index cc3cf7d..9cf9f4c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -73,6 +73,7 @@ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; +int replay_lag_sample_interval; /* libpqwalreceiver connection */ static WalReceiverConn *wrconn = NULL; @@ -138,7 +139,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendReply(bool force, bool requestReply, bool includeApplyTimestamp); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -456,7 +457,7 @@ WalReceiverMain(void) } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, false); /* * If we've written some records, flush them to disk and @@ -493,6 +494,8 @@ WalReceiverMain(void) ResetLatch(walrcv->latch); if (walrcv->force_reply) { + bool timestamp = walrcv->force_reply_apply_timestamp; + /* * The recovery process has asked us to send apply * feedback now. Make sure the flag is really set to @@ -500,8 +503,9 @@ WalReceiverMain(void) * we don't miss a new request for a reply. */ walrcv->force_reply = false; + walrcv->force_reply_apply_timestamp = false; pg_memory_barrier(); - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, timestamp); } } if (rc & WL_POSTMASTER_DEATH) @@ -559,7 +563,7 @@ WalReceiverMain(void) } } - XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendReply(requestReply, requestReply, false); XLogWalRcvSendHSFeedback(false); } } @@ -911,7 +915,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) /* If the primary requested a reply, send one immediately */ if (replyRequested) - XLogWalRcvSendReply(true, false); + XLogWalRcvSendReply(true, false, false); break; } default: @@ -1074,7 +1078,7 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(false, false, false); XLogWalRcvSendHSFeedback(false); } } @@ -1092,15 +1096,18 @@ XLogWalRcvFlush(bool dying) * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbearts, when approaching * wal_receiver_timeout. + * + * If 'reportApplyTimestamp' is true, the latest apply timestamp is included. */ static void -XLogWalRcvSendReply(bool force, bool requestReply) +XLogWalRcvSendReply(bool force, bool requestReply, bool reportApplyTimestamp) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; static TimestampTz sendTime = 0; TimestampTz now; + TimestampTz applyTimestamp = 0; /* * If the user doesn't want status to be reported to the master, be sure @@ -1132,7 +1139,35 @@ XLogWalRcvSendReply(bool force, bool requestReply) /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + applyTimestamp = GetXLogReplayTimestamp(&applyPtr); + + /* Decide whether to send an apply timestamp for replay lag estimation. */ + if (replay_lag_sample_interval != -1) + { + static TimestampTz lastTimestampSendTime = 0; + + /* + * Only send an apply timestamp if we were explicitly asked to by the + * recovery process or if replay lag sampling is active but the + * recovery process seems to be stuck. + * + * If we haven't heard from the recovery process in a time exceeding + * wal_receiver_status_interval and yet it has not applied the highest + * LSN we've heard about, then we want to resend the last replayed + * timestamp we have; otherwise we zero it out and wait for the + * recovery process to wake us when it has set a new accurate replay + * timestamp. Note that we can read latestWalEnd without acquiring the + * mutex that protects it because it is only written to by this + * process (walreceiver). + */ + if (reportApplyTimestamp || + (WalRcv->latestWalEnd > applyPtr && + TimestampDifferenceExceeds(lastTimestampSendTime, now, + wal_receiver_status_interval * 1000))) + lastTimestampSendTime = now; + else + applyTimestamp = 0; + } resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); @@ -1140,6 +1175,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp)); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ @@ -1244,18 +1280,40 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; - TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); + bool newHighWalEnd = false; + + static TimestampTz lastRecordedTimestamp = 0; /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); if (walrcv->latestWalEnd < walEnd) + { walrcv->latestWalEndTime = sendTime; + newHighWalEnd = true; + } walrcv->latestWalEnd = walEnd; walrcv->lastMsgSendTime = sendTime; walrcv->lastMsgReceiptTime = lastMsgReceiptTime; SpinLockRelease(&walrcv->mutex); + /* + * If replay lag sampling is active, remember the upstream server's + * timestamp at the latest WAL end that it has, unless we've already + * done that too recently or the LSN hasn't advanced. This timestamp + * will be fed back to us by the startup process when it eventually + * replays this LSN, so that we can feed it back to the upstream server + * for replay lag tracking purposes. + */ + if (replay_lag_sample_interval != -1 && + newHighWalEnd && + sendTime > TimestampTzPlusMilliseconds(lastRecordedTimestamp, + replay_lag_sample_interval)) + { + SetXLogReplayTimestampAtLsn(sendTime, walEnd); + lastRecordedTimestamp = sendTime; + } + if (log_min_messages <= DEBUG2) { char *sendtime; @@ -1291,12 +1349,14 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) * This is called by the startup process whenever interesting xlog records * are applied, so that walreceiver can check if it needs to send an apply * notification back to the master which may be waiting in a COMMIT with - * synchronous_commit = remote_apply. + * synchronous_commit = remote_apply. Also used to send periodic messages + * which are used to compute pg_stat_replication.replay_lag. */ void -WalRcvForceReply(void) +WalRcvForceReply(bool apply_timestamp) { WalRcv->force_reply = true; + WalRcv->force_reply_apply_timestamp = apply_timestamp; if (WalRcv->latch) SetLatch(WalRcv->latch); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d80bcc0..0782d78 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1553,15 +1553,29 @@ ProcessStandbyReplyMessage(void) XLogRecPtr writePtr, flushPtr, applyPtr; + int64 applyLagUs; bool replyRequested; + TimestampTz now = GetCurrentTimestamp(); + TimestampTz applyTimestamp; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message)); replyRequested = pq_getmsgbyte(&reply_message); + /* Compute the apply lag in milliseconds. */ + if (applyTimestamp == 0) + applyLagUs = -1; + else +#ifdef HAVE_INT64_TIMESTAMP + applyLagUs = now - applyTimestamp; +#else + applyLagUs = (now - applyTimestamp) * 1000000; +#endif + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, @@ -1583,6 +1597,8 @@ ProcessStandbyReplyMessage(void) walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; + if (applyLagUs >= 0) + walsnd->applyLagUs = applyLagUs; SpinLockRelease(&walsnd->mutex); } @@ -1979,6 +1995,7 @@ InitWalSenderSlot(void) walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; + walsnd->applyLagUs = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); @@ -2761,7 +2778,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 8 +#define PG_STAT_GET_WAL_SENDERS_COLS 9 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2809,6 +2826,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int64 applyLagUs; int priority; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2823,6 +2841,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + applyLagUs = walsnd->applyLagUs; priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); @@ -2857,6 +2876,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[5] = true; values[5] = LSNGetDatum(apply); + if (applyLagUs < 0) + nulls[6] = true; + else + { + Interval *applyLagInterval = palloc(sizeof(Interval)); + + applyLagInterval->month = 0; + applyLagInterval->day = 0; +#ifdef HAVE_INT64_TIMESTAMP + applyLagInterval->time = applyLagUs; +#else + applyLagInterval->time = applyLagUs / 1000000.0; +#endif + nulls[6] = false; + values[6] = IntervalPGetDatum(applyLagInterval); + } + /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an @@ -2864,18 +2900,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; - values[6] = Int32GetDatum(priority); + values[7] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely * informational, not different from priority. */ if (priority == 0) - values[7] = CStringGetTextDatum("async"); + values[8] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) - values[7] = CStringGetTextDatum("sync"); + values[8] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + values[8] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index c1d6f05..323d640 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1724,6 +1724,20 @@ GetSQLLocalTimestamp(int32 typmod) } /* + * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format + * + * When compiled with --enable-integer-datetimes, this is implemented as a + * no-op macro. + */ +#ifndef HAVE_INT64_TIMESTAMP +int64 +TimestampTzToIntegerTimestamp(TimestampTz timestamp) +{ + return timestamp * 1000000; +} +#endif + +/* * TimestampDifference -- convert the difference between two timestamps * into integer seconds and microseconds * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a025117..b1af028 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1810,6 +1810,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"replay_lag_sample_interval", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the minimum time between WAL timestamp samples used to estimate replay lag."), + NULL, + GUC_UNIT_MS + }, + &replay_lag_sample_interval, + 1 * 1000, -1, INT_MAX / 1000, + NULL, NULL, NULL + }, + + { {"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY, gettext_noop("Sets the maximum wait time to receive data from the primary."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 7f9acfd..bf298f2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -270,6 +270,8 @@ # in milliseconds; 0 disables #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt +#replay_lag_sample_interval = 1s # min time between timestamps recorded + # to estimate replay lag; -1 disables replay lag sampling #------------------------------------------------------------------------------ diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index cb5f989..9753648 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -111,7 +111,7 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested) static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; - char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 1]; int len = 0; /* @@ -142,6 +142,8 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested) len += 8; fe_sendint64(now, &replybuf[len]); /* sendTime */ len += 8; + fe_sendint64(0, &replybuf[len]); /* applyTimestamp */ + len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 4382e5d..8e89627 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -321,7 +321,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) { - char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + char replybuf[1 + 8 + 8 + 8 + 8 + 8 + 1]; int len = 0; replybuf[len] = 'r'; @@ -337,6 +337,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) len += 8; fe_sendint64(now, &replybuf[len]); /* sendTime */ len += 8; + fe_sendint64(0, &replybuf[len]); /* applyTimestamp */ + len += 8; replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c9f332c..1be2f34 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -237,6 +237,9 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI); extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void); +extern void SetXLogReplayTimestamp(TimestampTz timestamp); +extern void SetXLogReplayTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn); +extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn); extern bool RecoveryIsPaused(void); extern void SetRecoveryPause(bool recoveryPause); extern TimestampTz GetLatestXTime(void); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index cd7b909..b565dd8 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2768,7 +2768,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ )); DESCR("statistics: information about progress of backends running maintenance command"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,23,25}" "{o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 28dc1fc..be25758 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -23,6 +23,7 @@ extern int wal_receiver_status_interval; extern int wal_receiver_timeout; extern bool hot_standby_feedback; +extern int replay_lag_sample_interval; /* * MAXCONNINFO: maximum size of a connection string. @@ -119,6 +120,9 @@ typedef struct */ bool force_reply; + /* include the latest replayed timestamp when replying? */ + bool force_reply_apply_timestamp; + /* set true once conninfo is ready to display (obfuscated pwds etc) */ bool ready_to_display; @@ -208,6 +212,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); -extern void WalRcvForceReply(void); +extern void WalRcvForceReply(bool sendApplyTimestamp); #endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7794aa5..4de43e8 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -46,6 +46,7 @@ typedef struct WalSnd XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + int64 applyLagUs; /* Protects shared variables shown above. */ slock_t mutex; diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 93b90fe..20517c9 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -233,9 +233,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time, #ifndef HAVE_INT64_TIMESTAMP extern int64 GetCurrentIntegerTimestamp(void); extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp); +extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp); #else #define GetCurrentIntegerTimestamp() GetCurrentTimestamp() #define IntegerTimestampToTimestampTz(timestamp) (timestamp) +#define TimestampTzToIntegerTimestamp(timestamp) (timestamp) #endif extern TimestampTz time_t_to_timestamptz(pg_time_t tm); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5314b9c..d59956f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1809,10 +1809,11 @@ pg_stat_replication| SELECT s.pid, w.write_location, w.flush_location, w.replay_location, + w.replay_lag, w.sync_priority, w.sync_state FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl,