From f621571b4406b71e8522cc7846030cb3d11fccef Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 27 Jun 2023 07:10:45 +0000 Subject: [PATCH 3/6] reuse connection when tablesync workers change the target --- src/backend/replication/logical/tablesync.c | 31 ++++++++++++++++----- src/backend/replication/logical/worker.c | 20 +++++++------ src/backend/replication/walsender.c | 6 ++++ 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 37f073b968..0fc81355bf 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1321,14 +1321,31 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) NAMEDATALEN); /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. + * Connect to publisher if not yet. The application_name must be also + * different from the leader apply worker because synchronous replication + * must distinguish them. */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, - must_use_password, - slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * FIXME: set appropriate application_name. Previously, the slot name + * was used becasue the lifetime of the tablesync worker was same as + * that, but now the tablesync worker handles many slots during the + * synchronization so that it is not suitable. So what should be? + * Note that if the tablesync worker starts to reuse the replication + * slot during synchronization, we should use the slot name as + * application_name again. + */ + snprintf(application_name, NAMEDATALEN, "tablesync for %s", + MySubscription->name); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4a0f402ad4..8df960e343 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3498,19 +3498,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * protocol message, if needed. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* * This memory context is used for per-stream data when the streaming mode * is enabled. This context is reset on each stream stop. */ - LogicalStreamingContext = AllocSetContextCreate(ApplyContext, - "LogicalStreamingContext", - ALLOCSET_DEFAULT_SIZES); + if (!LogicalStreamingContext) + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -4891,7 +4893,9 @@ TablesyncWorkerMain(Datum main_arg) /* found a table for next iteration */ is_table_found = true; - clean_sync_worker(); + + CommitTransactionCommand(); + pgstat_report_stat(true); StartTransactionCommand(); ereport(LOG, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d3a136b6f5..429d00f2f0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1828,6 +1828,12 @@ exec_replication_command(const char *cmd_string) set_ps_display(cmdtag); PreventInTransactionBlock(true, cmdtag); + /* + * Initialize the flag again because this streaming may be + * second time. + */ + streamingDoneSending = streamingDoneReceiving = false; + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else -- 2.27.0