From b0f3066e5d592c603aee8f89ff4171f2b2a8e7f6 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:13:52 +0300 Subject: [PATCH v16 3/5] reuse connection when tablesync workers change the target --- src/backend/replication/logical/tablesync.c | 53 ++++++++++++++------- src/backend/replication/logical/worker.c | 30 +++++++----- src/backend/replication/walsender.c | 6 +++ src/include/replication/worker_internal.h | 3 +- 4 files changed, 61 insertions(+), 31 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 605c5bd4ec..f042d9ae00 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -144,16 +144,6 @@ clean_sync_worker(void) pgstat_report_stat(true); } - /* - * Disconnect from publisher. Otherwise reused sync workers causes - * exceeding max_wal_senders - */ - if (LogRepWorkerWalRcvConn != NULL) - { - walrcv_disconnect(LogRepWorkerWalRcvConn); - LogRepWorkerWalRcvConn = NULL; - } - /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); } @@ -167,6 +157,16 @@ finish_sync_worker(void) { clean_sync_worker(); + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders. + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } + /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); @@ -1268,7 +1268,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, * The returned slot name is palloc'ed in current memory context. */ char * -LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot) { char *slotname; char *err; @@ -1321,14 +1321,31 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) NAMEDATALEN); /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. + * Connect to publisher if not yet. The application_name must be also + * different from the leader apply worker because synchronous replication + * must distinguish them. */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, - must_use_password, - slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * FIXME: set appropriate application_name. Previously, the slot name + * was used becasue the lifetime of the tablesync worker was same as + * that, but now the tablesync worker handles many slots during the + * synchronization so that it is not suitable. So what should be? + * Note that if the tablesync worker starts to reuse the replication + * slot during synchronization, we should use the slot name as + * application_name again. + */ + snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i", + MySubscription->oid, worker_slot); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eae561db05..ca663445c1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3500,19 +3500,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * protocol message, if needed. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* * This memory context is used for per-stream data when the streaming mode * is enabled. This context is reset on each stream stop. */ - LogicalStreamingContext = AllocSetContextCreate(ApplyContext, - "LogicalStreamingContext", - ALLOCSET_DEFAULT_SIZES); + if (!LogicalStreamingContext) + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -4468,7 +4470,9 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) * are not repeatable. */ static void -start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +start_table_sync(XLogRecPtr *origin_startpos, + char **myslotname, + int worker_slot) { char *syncslotname = NULL; @@ -4477,7 +4481,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) PG_TRY(); { /* Call initial sync. */ - syncslotname = LogicalRepSyncTableStart(origin_startpos); + syncslotname = LogicalRepSyncTableStart(origin_startpos, worker_slot); } PG_CATCH(); { @@ -4548,12 +4552,13 @@ run_tablesync_worker(WalRcvStreamOptions *options, char *slotname, char *originname, int originname_size, - XLogRecPtr *origin_startpos) + XLogRecPtr *origin_startpos, + int worker_slot) { MyLogicalRepWorker->is_sync_completed = false; /* Start table synchronization. */ - start_table_sync(origin_startpos, &slotname); + start_table_sync(origin_startpos, &slotname, worker_slot); ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, @@ -4857,7 +4862,8 @@ TablesyncWorkerMain(Datum main_arg) myslotname, originname, sizeof(originname), - &origin_startpos); + &origin_startpos, + worker_slot); if (IsTransactionState()) CommitTransactionCommand(); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cedadb0036..06adcf6f01 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1827,6 +1827,12 @@ exec_replication_command(const char *cmd_string) set_ps_display(cmdtag); PreventInTransactionBlock(true, cmdtag); + /* + * Initialize the flag again because this streaming may be + * second time. + */ + streamingDoneSending = streamingDoneReceiving = false; + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1e9f8e6e72..af6fd339f7 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -249,7 +249,8 @@ extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); -extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); +extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, + int worker_slot); extern bool AllTablesyncsReady(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); -- 2.27.0