From 3ff63e4db6966194b03c86e77c1b312aab5e26cf Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 27 Jun 2023 07:10:45 +0000 Subject: [PATCH 3/5] reuse connection when tablesync workers change the target --- src/backend/replication/logical/tablesync.c | 21 ++++++++++++++------- src/backend/replication/logical/worker.c | 20 ++++++++++++-------- src/backend/replication/walsender.c | 6 ++++++ 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 37f073b968..30fe9b78ac 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1321,14 +1321,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) NAMEDATALEN); /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. + * Connect to publisher if not yet. The application_name must be also + * different from the leader apply worker. */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, - must_use_password, - slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + snprintf(application_name, NAMEDATALEN, "tablesync for %s", + MySubscription->name); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4a0f402ad4..8df960e343 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3498,19 +3498,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * protocol message, if needed. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* * This memory context is used for per-stream data when the streaming mode * is enabled. This context is reset on each stream stop. */ - LogicalStreamingContext = AllocSetContextCreate(ApplyContext, - "LogicalStreamingContext", - ALLOCSET_DEFAULT_SIZES); + if (!LogicalStreamingContext) + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -4891,7 +4893,9 @@ TablesyncWorkerMain(Datum main_arg) /* found a table for next iteration */ is_table_found = true; - clean_sync_worker(); + + CommitTransactionCommand(); + pgstat_report_stat(true); StartTransactionCommand(); ereport(LOG, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d3a136b6f5..429d00f2f0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1828,6 +1828,12 @@ exec_replication_command(const char *cmd_string) set_ps_display(cmdtag); PreventInTransactionBlock(true, cmdtag); + /* + * Initialize the flag again because this streaming may be + * second time. + */ + streamingDoneSending = streamingDoneReceiving = false; + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else -- 2.27.0