From 0a4d3a6c62bacd2b5592043ca4ba2408b127f1f5 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Tue, 24 Jan 2023 21:12:28 -0800 Subject: [PATCH v3 2/2] suppress useless wakeups in logical/worker.c --- src/backend/replication/logical/tablesync.c | 28 +++ src/backend/replication/logical/worker.c | 192 ++++++++++++++++---- src/include/replication/worker_internal.h | 4 + src/tools/pgindent/typedefs.list | 1 + 4 files changed, 189 insertions(+), 36 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 07eea504ba..573b46b5a2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); + /* + * If we've made it past our previously-stored special wakeup time, reset + * it so that it can be recalculated as needed. + */ + if (LogRepWorkerGetSyncStartWakeup() <= GetCurrentTimestamp()) + LogRepWorkerClearSyncStartWakeup(); + /* We need up-to-date sync state info for subscription tables here. */ FetchTableStates(&started_tx); @@ -592,6 +599,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) DSM_HANDLE_INVALID); hentry->last_start_time = now; } + else + { + TimestampTz retry_time; + + /* + * Store when we can start the sync worker so that we + * know how long to sleep. + */ + retry_time = TimestampTzPlusMilliseconds(hentry->last_start_time, + wal_retrieve_retry_interval); + LogRepWorkerUpdateSyncStartWakeup(retry_time); + } + } + else + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz retry_time; + + /* Maybe there will be a free slot in a second... */ + retry_time = TimestampTzPlusSeconds(now, 1); + LogRepWorkerUpdateSyncStartWakeup(retry_time); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cfb2ab6248..83fb8c3110 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -208,8 +208,6 @@ #include "utils/syscache.h" #include "utils/timeout.h" -#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ - typedef struct FlushPosition { dlist_node node; @@ -351,6 +349,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum LogRepWorkerWakeupReason +{ + LRW_WAKEUP_TERMINATE, + LRW_WAKEUP_PING, + LRW_WAKEUP_STATUS, + LRW_WAKEUP_SYNC_START +#define NUM_LRW_WAKEUPS (LRW_WAKEUP_SYNC_START + 1) +} LogRepWorkerWakeupReason; + +/* + * Wake up times for periodic tasks. + */ +static TimestampTz wakeup[NUM_LRW_WAKEUPS]; + +static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, + TimestampTz now); + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -3449,10 +3467,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - TimestampTz last_recv_timestamp = GetCurrentTimestamp(); - bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + TimestampTz now; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3482,6 +3499,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = &errcallback; apply_error_context_stack = error_context_stack; + /* Initialize nap wakeup times. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + /* This outer loop iterates once per wait. */ for (;;) { @@ -3498,6 +3520,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + now = GetCurrentTimestamp(); if (len != 0) { /* Loop to process all available data (without blocking). */ @@ -3521,9 +3544,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) int c; StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + /* Adjust the ping and terminate wakeup times. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now); + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now); /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3577,6 +3600,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + now = GetCurrentTimestamp(); } } @@ -3615,7 +3639,33 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (!dlist_is_empty(&lsn_mapping)) wait_time = WalWriterDelay; else - wait_time = NAPTIME_PER_CYCLE; + { + TimestampTz nextWakeup = DT_NOEND; + + /* + * Since process_syncing_tables() is called conditionally, the + * tablesync worker start wakeup time might be in the past, and we + * can't know for sure when it will be updated again. Rather than + * spinning in a tight loop in this case, bump this wakeup time by + * a second. + */ + now = GetCurrentTimestamp(); + if (wakeup[LRW_WAKEUP_SYNC_START] < now) + wakeup[LRW_WAKEUP_SYNC_START] = TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1); + + /* Find soonest wakeup time, to limit our nap. */ + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + nextWakeup = Min(wakeup[i], nextWakeup); + + /* + * Calculate the nap time. WaitLatchOrSocket() doesn't accept + * timeouts longer than INT_MAX milliseconds, so we limit the + * result accordingly. Also, we round up to the next millisecond + * to avoid waking up too early and spinning until one of the + * wakeup times. + */ + wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000)); + } rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | @@ -3623,6 +3673,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) fd, wait_time, WAIT_EVENT_LOGICAL_APPLY_MAIN); + now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -3633,6 +3684,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + + /* + * LogRepWorkerComputeNextWakeup() will have cleared the tablesync + * worker start wakeup time, so we might not wake up to start a new + * worker at the appropriate time. To deal with this, we set the + * wakeup time to right now so that + * process_syncing_tables_for_apply() recalculates it as soon as + * possible. + */ + if (!am_tablesync_worker()) + LogRepWorkerUpdateSyncStartWakeup(now); } if (rc & WL_TIMEOUT) @@ -3651,31 +3716,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Check if time since last receive from primary has reached the * configured limit. */ - if (wal_receiver_timeout > 0) - { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; - - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); + if (now >= wakeup[LRW_WAKEUP_TERMINATE]) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); - if (now >= timeout) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication worker due to timeout"))); - - /* Check to see if it's time for a ping. */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } + /* Check to see if it's time for a ping. */ + if (now >= wakeup[LRW_WAKEUP_PING]) + { + requestReply = true; + wakeup[LRW_WAKEUP_PING] = DT_NOEND; } send_feedback(last_received, requestReply, requestReply); @@ -3711,7 +3761,6 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { static StringInfo reply_message = NULL; - static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; @@ -3754,10 +3803,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!force && writepos == last_writepos && flushpos == last_flushpos && - !TimestampDifferenceExceeds(send_time, now, - wal_receiver_status_interval * 1000)) + now < wakeup[LRW_WAKEUP_STATUS]) return; - send_time = now; + + /* Make sure we wake up when it's time to send another status update. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now); if (!reply_message) { @@ -5056,3 +5106,73 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_APPLY; } } + +/* + * Compute the next wakeup time for a given wakeup reason. Can be called to + * initialize a wakeup time, to adjust it for the next wakeup, or to + * reinitialize it when GUCs have changed. We ask the caller to pass in the + * value of "now" because this frequently avoids multiple calls of + * GetCurrentTimestamp(). It had better be a reasonably up-to-date value + * though. + */ +static void +LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now) +{ + switch (reason) + { + case LRW_WAKEUP_TERMINATE: + if (wal_receiver_timeout <= 0) + wakeup[reason] = DT_NOEND; + else + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout); + break; + case LRW_WAKEUP_PING: + if (wal_receiver_timeout <= 0) + wakeup[reason] = DT_NOEND; + else + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2); + break; + case LRW_WAKEUP_STATUS: + if (wal_receiver_status_interval <= 0) + wakeup[reason] = DT_NOEND; + else + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); + break; + case LRW_WAKEUP_SYNC_START: + /* + * This wakeup time is manually set as needed. This function can + * only be used to initialize its value. + */ + wakeup[reason] = DT_NOEND; + break; + } +} + +/* + * Retrieve the current wakeup time for starting tablesync workers. + */ +TimestampTz +LogRepWorkerGetSyncStartWakeup(void) +{ + return wakeup[LRW_WAKEUP_SYNC_START]; +} + +/* + * Update the current wakeup time for starting tablesync workers. If the + * current wakeup time is <= next_sync_start, no action is taken. + */ +void +LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start) +{ + if (next_sync_start < wakeup[LRW_WAKEUP_SYNC_START]) + wakeup[LRW_WAKEUP_SYNC_START] = next_sync_start; +} + +/* + * Clear the current wakeup time for starting tablesync workers. + */ +void +LogRepWorkerClearSyncStartWakeup(void) +{ + wakeup[LRW_WAKEUP_SYNC_START] = DT_NOEND; +} diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index dc87a4edd1..ae44717588 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern TimestampTz LogRepWorkerGetSyncStartWakeup(void); +extern void LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start); +extern void LogRepWorkerClearSyncStartWakeup(void); + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 51484ca7e2..0c8b6ebc4b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1435,6 +1435,7 @@ LockViewRecurse_context LockWaitPolicy LockingClause LogOpts +LogRepWorkerWakeupReason LogStmtLevel LogicalDecodeBeginCB LogicalDecodeBeginPrepareCB -- 2.25.1