From c592a0175cf37c0033fc4cb1fc33d7d312cb83d6 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:13:52 +0300 Subject: [PATCH v20 3/5] Reuse connection when tablesync workers change the target Previously tablesync workers establish new connections when it changes the syncing table, but this might have additional overhead. This patch allows to reuse connections instead. As for the publisher node, this patch allows to reuse logical walsender processes after the streaming is done once. --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 56 ++++++++++++++------- src/backend/replication/logical/worker.c | 18 ++++--- src/backend/replication/walsender.c | 7 +++ src/include/replication/worker_internal.h | 3 ++ 5 files changed, 58 insertions(+), 27 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 72e5ef8a78..945619b603 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -441,6 +441,7 @@ retry: worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; worker->is_sync_completed = false; + worker->worker_slot = slot; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3bb482994..73b6fd77e2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -150,16 +150,6 @@ finish_sync_worker(bool reuse_worker) pgstat_report_stat(true); } - /* - * Disconnect from the publisher otherwise reusing the sync worker can - * error due to exceeding max_wal_senders. - */ - if (LogRepWorkerWalRcvConn != NULL) - { - walrcv_disconnect(LogRepWorkerWalRcvConn); - LogRepWorkerWalRcvConn = NULL; - } - /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); @@ -1258,6 +1248,24 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, relid, GetSystemIdentifier()); } +/* + * Determine the application_name for tablesync workers. + * + * FIXME: set appropriate application_name. Previously, the slot name was used + * because the lifetime of the tablesync worker was same as that, but now the + * tablesync worker handles many slots during the synchronization so that it is + * not suitable. So what should be? Note that if the tablesync worker starts to + * reuse the replication slot during synchronization, we should use the slot + * name as application_name again. + */ +static void +ApplicationNameForTablesync(Oid suboid, int worker_slot, + char *application_name, Size szapp) +{ + snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid, + worker_slot, GetSystemIdentifier()); +} + /* * Start syncing the table in the sync worker. * @@ -1319,15 +1327,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) slotname, NAMEDATALEN); - /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. - */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, - must_use_password, - slotname, &err); + /* Connect to the publisher if haven't done so already. */ + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * The application_name must be also different from the leader apply + * worker because synchronous replication must distinguish them. + */ + ApplicationNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->worker_slot, + application_name, + NAMEDATALEN); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ed8b20eb34..7ba58d0e67 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3480,20 +3480,22 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ErrorContextCallback errcallback; /* - * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * Init the ApplyMessageContext if needed. This context is cleaned up + * after each replication protocol message. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* * This memory context is used for per-stream data when the streaming mode * is enabled. This context is reset on each stream stop. */ - LogicalStreamingContext = AllocSetContextCreate(ApplyContext, - "LogicalStreamingContext", - ALLOCSET_DEFAULT_SIZES); + if (!LogicalStreamingContext) + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..2f3e93cc40 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1830,7 +1830,14 @@ exec_replication_command(const char *cmd_string) if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else + { + /* + * Reset flags because reusing tablesync workers can mean + * this is the second time here. + */ + streamingDoneSending = streamingDoneReceiving = false; StartLogicalReplication(cmd); + } /* dupe, but necessary per libpqrcv_endstreaming */ EndReplicationCommand(cmdtag); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 891b020aff..804c475746 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -63,6 +63,9 @@ typedef struct LogicalRepWorker */ bool is_sync_completed; + /* Indicates the slot number which corresponds to this LogicalRepWorker. */ + int worker_slot; + /* * Used to create the changes and subxact files for the streaming * transactions. Upon the arrival of the first streaming transaction or -- 2.25.1