From 8853500e9b238b0d07871940d4e7db9e43802b8b Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Mon, 21 Nov 2022 16:01:01 -0800 Subject: [PATCH v13 1/2] wake up logical workers as needed instead of relying on periodic wakeups --- src/backend/access/transam/xact.c | 3 ++ src/backend/commands/alter.c | 7 +++ src/backend/commands/subscriptioncmds.c | 7 +++ src/backend/replication/logical/tablesync.c | 49 ++++++++++++--------- src/backend/replication/logical/worker.c | 46 +++++++++++++++++++ src/include/replication/logicalworker.h | 3 ++ 6 files changed, 93 insertions(+), 22 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 24221542e7..54145bf805 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -47,6 +47,7 @@ #include "pgstat.h" #include "replication/logical.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -2360,6 +2361,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_LogicalRepWorkers(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2860,6 +2862,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_LogicalRepWorkers(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 70d359eb6a..4e8102b59f 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -59,6 +59,7 @@ #include "commands/user.h" #include "miscadmin.h" #include "parser/parse_func.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) if (strncmp(new_name, "regress_", 8) != 0) elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + + /* + * Wake up the logical replication workers to handle this change + * quickly. + */ + LogicalRepWorkersWakeupAtCommit(objectId); } else if (nameCacheId >= 0) { diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b9c5df796f..c0fd82cdf2 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "pgstat.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); + /* Wake up the logical replication workers to handle this change quickly. */ + LogicalRepWorkersWakeupAtCommit(subid); + return myself; } @@ -1733,6 +1737,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->oid, 0); ApplyLauncherWakeupAtCommit(); + + /* Wake up the logical replication workers to handle this change quickly. */ + LogicalRepWorkersWakeupAtCommit(form->oid); } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2fdfeb5b4c..8c8f27ebcf 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + bool should_exit = false; Assert(!IsTransactionState()); @@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = NULL; } - /* - * Even when the two_phase mode is requested by the user, it remains as - * 'pending' until all tablesyncs have reached READY state. - * - * When this happens, we restart the apply worker and (if the conditions - * are still ok) then the two_phase tri-state will become 'enabled' at - * that time. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); - - proc_exit(0); - } - /* * Process all tables that are being synchronized. */ @@ -619,9 +598,35 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (started_tx) { + /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + CommandCounterIncrement(); /* make updates visible */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + + should_exit = true; + } + CommitTransactionCommand(); pgstat_report_stat(true); } + + if (should_exit) + proc_exit(0); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f8e8cf71eb..cf38ed821e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +static List *on_commit_wakeup_workers_subids = NIL; + bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Wakeup the stored subscriptions' workers on commit if requested. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && on_commit_wakeup_workers_subids != NIL) + { + ListCell *subid; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + foreach(subid, on_commit_wakeup_workers_subids) + { + List *workers; + ListCell *worker; + + workers = logicalrep_workers_find(lfirst_oid(subid), true); + foreach(worker, workers) + logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker)); + } + LWLockRelease(LogicalRepWorkerLock); + } + + on_commit_wakeup_workers_subids = NIL; +} + +/* + * Request wakeup of the workers for the given subscription ID on commit of the + * transaction. + * + * This is used to ensure that the workers process assorted changes as soon as + * possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids, + subid); + MemoryContextSwitchTo(oldcxt); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index f1e7e8a348..3b2084f81f 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void LogicalRepWorkersWakeupAtCommit(Oid subid); +extern void AtEOXact_LogicalRepWorkers(bool isCommit); + #endif /* LOGICALWORKER_H */ -- 2.25.1