From 126e41be1c0af2260fa84f6a3bf6b738442df58f Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Wed, 4 Jan 2023 11:38:20 -0800 Subject: [PATCH v11 2/2] bypass wal_retrieve_retry_interval for logical workers as appropriate --- src/backend/commands/subscriptioncmds.c | 9 +- src/backend/replication/logical/launcher.c | 155 +++++++++++++++----- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 2 +- src/include/replication/worker_internal.h | 3 + 5 files changed, 128 insertions(+), 43 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b9bbb2cf4e..3293ec2043 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1137,8 +1137,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; - if (opts.enabled) - ApplyLauncherWakeupAtCommit(); + /* + * Even if the subscription is disabled, we wake up the + * launcher so that it clears its last start time. This + * ensures the workers will be able to start up right away when + * the subscription is enabled again. + */ + ApplyLauncherWakeupAtCommit(); update_tuple = true; break; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a69e371c05..544e73dd62 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -297,7 +297,7 @@ retry: { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (!w->in_use) + if (!w->in_use && !w->restart_immediately) { worker = w; slot = i; @@ -620,8 +620,14 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) worker->proc = NULL; worker->dbid = InvalidOid; worker->userid = InvalidOid; - worker->subid = InvalidOid; worker->relid = InvalidOid; + + /* + * If restart_immediately was set, retain the subscription ID so that the + * launcher knows which worker it should restart right away. + */ + if (!worker->restart_immediately) + worker->subid = InvalidOid; } /* @@ -801,7 +807,13 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { - TimestampTz last_start_time = 0; + struct launcher_start_time_mapping + { + Oid subid; + TimestampTz last_start_time; + }; + HTAB *last_start_times = NULL; + HASHCTL ctl; ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -822,6 +834,18 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + /* + * Prepare a hash table for tracking last start times of workers, to avoid + * immediate restarts. Ideally, this hash table would be created in shared + * memory so that other backends could adjust it directly to avoid race + * conditions, but it would need to be a fixed size, and the number of + * subscriptions is virtually unbounded. + */ + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(struct launcher_start_time_mapping); + last_start_times = hash_create("Logical replication apply worker start times", + 256, &ctl, HASH_ELEM | HASH_BLOBS); + /* Enter main loop */ for (;;) { @@ -832,63 +856,116 @@ ApplyLauncherMain(Datum main_arg) MemoryContext oldctx; TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + HASH_SEQ_STATUS status; + struct launcher_start_time_mapping *hentry; CHECK_FOR_INTERRUPTS(); now = GetCurrentTimestamp(); - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (TimestampDifferenceExceeds(last_start_time, now, - wal_retrieve_retry_interval)) + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + sublist = get_subscription_list(); + foreach(lc, sublist) { - /* Use temporary context for the database list and worker info. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); + Subscription *sub = (Subscription *) lfirst(lc); + bool bypass_retry_interval = false; + LogicalRepWorker *w; - /* search for subscriptions to start or stop. */ - sublist = get_subscription_list(); + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - /* Start the missing workers for enabled subscriptions. */ - foreach(lc, sublist) + /* + * Bypass wal_retrieve_retry_interval if the worker set + * restart_immediately. We do this before checking if the + * subscription is enabled so that the slot can be freed + * regardless. + */ + for (int i = 0; i < max_logical_replication_workers; i++) { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; + w = &LogicalRepCtx->workers[i]; - if (!sub->enabled) - continue; + if (!w->in_use && !w->proc && w->subid == sub->oid && + w->relid == InvalidOid && w->restart_immediately) + { + w->restart_immediately = false; + logicalrep_worker_cleanup(w); + bypass_retry_interval = true; + break; + } + } - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); - if (w == NULL) - { - last_start_time = now; - wait_time = wal_retrieve_retry_interval; + /* + * If the subscription isn't enabled, clear its entry in + * last_start_times so that its apply worker is restarted right + * away when it is enabled again. There is a chance that the + * subscription could be enabled again before we've had a chance to + * clear its entry, in which case we'll wait a little bit before + * starting the worker. + */ + if (!sub->enabled) + { + hash_search(last_start_times, &sub->oid, HASH_REMOVE, NULL); + continue; + } + + /* + * If its okay to start the worker now, do so. Otherwise, adjust + * wait_time so that we wake up when we can start it. + */ + if (w == NULL) + { + bool found; + hentry = hash_search(last_start_times, &sub->oid, + HASH_ENTER, &found); + + if (bypass_retry_interval || !found || + TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + { + hentry->last_start_time = now; logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid); } - } + else + { + long elapsed; - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + elapsed = TimestampDifferenceMilliseconds(hentry->last_start_time, now); + wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed); + } + } } - else + + /* + * Do garbage collection on the last_start_times hash table. In + * theory, a subscription OID could be reused before its entry is + * removed, but the risk of that seems low, and at worst the launcher + * will wait a bit longer before starting the new subscription's apply + * worker. This risk could be reduced by removing entries for + * subscriptions that aren't in sublist, but it doesn't seem worth the + * trouble. + */ + hash_seq_init(&status, last_start_times); + while ((hentry = (struct launcher_start_time_mapping *) hash_seq_search(&status)) != NULL) { - /* - * The wait in previous cycle was interrupted in less than - * wal_retrieve_retry_interval since last worker was started, this - * usually means crash of the worker, so we should retry in - * wal_retrieve_retry_interval again. - */ - wait_time = wal_retrieve_retry_interval; + if (TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + hash_search(last_start_times, &hentry->subid, HASH_REMOVE, NULL); } + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + /* Wait for more work. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index fcadc1e98e..3fa6472c88 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -465,7 +465,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", MySubscription->name))); - + MyLogicalRepWorker->restart_immediately = true; proc_exit(0); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3e2ea32e1e..0521459898 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3130,7 +3130,7 @@ maybe_reread_subscription(void) ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", MySubscription->name))); - + MyLogicalRepWorker->restart_immediately = true; proc_exit(0); } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 2a3ec5c2d8..68a56c175b 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -66,6 +66,9 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + /* Should the launcher restart the worker immediately? */ + bool restart_immediately; } LogicalRepWorker; /* Main memory context for apply worker. Permanent during worker lifetime. */ -- 2.25.1