From 55eb5d8980c59cd5743f941517bc3af2744d761e Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Mon, 26 Sep 2022 12:59:43 +1000 Subject: [PATCH v4 2/2] Add common function ReplicationOriginNameForLogicalRep. Make a common replication origin name formatting function to replace multiple snprintf() expressions. This also includes logic previously done by ReplicationOriginNameForTablesync(). Peter Smith, reviewed by Aleksander Alekseev Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com --- src/backend/commands/subscriptioncmds.c | 15 +++++---- src/backend/replication/logical/tablesync.c | 36 +++++++-------------- src/backend/replication/logical/worker.c | 35 +++++++++++++++++--- src/include/replication/worker_internal.h | 4 +-- 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f3bfcca434..97594cd9b1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); /* @@ -946,8 +946,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * origin and by this time the origin might be already * removed. For these reasons, passing missing_ok = true. */ - ReplicationOriginNameForTablesync(sub->oid, relid, originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, char originname[NAMEDATALEN]; XLogRecPtr remote_lsn; - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, false); remote_lsn = replorigin_get_progress(originid, false); @@ -1521,8 +1522,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(subid, relid, originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) RemoveSubscriptionRel(subid, InvalidOid); /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 9e52fc401c..b6e0af0a76 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -353,10 +353,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ StartTransactionCommand(); - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); /* * Resetting the origin session removes the ownership of the slot. @@ -505,10 +505,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* @@ -1193,18 +1193,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, relid, GetSystemIdentifier()); } -/* - * Form the origin name for tablesync. - * - * Return the name in the supplied buffer. - */ -void -ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname) -{ - snprintf(originname, szorgname, "pg_%u_%u", suboid, relid); -} - /* * Start syncing the table in the sync worker. * @@ -1274,10 +1262,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); /* Assign the origin tracking record name. */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 188c51660e..bfa32391bf 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -364,6 +364,30 @@ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +/* + * Form the origin name for the subscription. + * + * This is a common function for tablesync and other workers. Tablesync workers + * must pass a valid relid. Other callers must pass relid = InvalidOid. + * + * Return the name in the supplied buffer. + */ +void +ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname) +{ + if (OidIsValid(relid)) + { + /* Replication origin name for tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid); + } + else + { + /* Replication origin name for non-tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u", suboid); + } +} + /* * Should this worker apply changes for given relation. * @@ -3679,10 +3703,10 @@ ApplyWorkerMain(Datum main_arg) * Allocate the origin name in long-lived context for error context * message. */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, originname); } @@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg) /* Setup replication origin tracking. */ StartTransactionCommand(); - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) originid = replorigin_create(originname); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f82bc518c3..2b7114ff6d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); -extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname); +extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern bool AllTablesyncsReady(void); -- 2.37.2