From dcfc3094dbb40550a5055496d156a87b8e3e2065 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Mon, 5 Jun 2023 15:45:29 +0300 Subject: [PATCH 2/4] Reuse Tablesync Workers This commit allows reusing tablesync workers for syncing more than one table sequentially during their lifetime, instead of exiting after only syncing one table. Before this commit, tablesync workers were capable of syncing only one table. For each table, a new sync worker was launched and that worker would exit when done processing the table. Now, tablesync workers are not limited to processing only one table. When done, they can move to processing another table in the same subscription. If there is a table that needs to be synced, an available tablesync worker picks up that table and syncs it. Each tablesync worker continues to pick new tables to sync until there are no tables left requiring synchronization. If there was no available worker to process the table, then a new tablesync worker will be launched, provided the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 46 ++++++-- src/backend/replication/logical/worker.c | 110 +++++++++++++++++++- src/include/replication/worker_internal.h | 7 ++ 4 files changed, 149 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index c2bba3ba69..0c6ce69c58 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -440,6 +440,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->is_sync_completed = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8125bbd170..37f073b968 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -129,11 +129,10 @@ static bool FetchTableStates(bool *started_tx); static StringInfo copybuf = NULL; /* - * Exit routine for synchronization worker. + * Prepares the synchronization worker for reuse or exit. */ void -pg_attribute_noreturn() -finish_sync_worker(void) +clean_sync_worker(void) { /* * Commit any outstanding transaction. This is the usual case, unless @@ -145,19 +144,38 @@ finish_sync_worker(void) pgstat_report_stat(true); } + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } + + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); +} + +/* + * Exit routine for synchronization worker. + */ +void +pg_attribute_noreturn() +finish_sync_worker(void) +{ + clean_sync_worker(); + /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* Stop gracefully */ proc_exit(0); } @@ -379,7 +397,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + /* Sync worker has completed synchronization of the current table. */ + MyLogicalRepWorker->is_sync_completed = true; + + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b979a755ae..4a0f402ad4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3624,6 +3624,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of the + * current table is completed. If it is completed, streaming must + * be already ended. So, we can break the loop. + */ + if (MyLogicalRepWorker->is_sync_completed) + { + endofstream = true; + break; + } + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -3643,6 +3657,15 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Process any table synchronization changes. */ process_syncing_tables(last_received); + + /* + * If is_sync_completed is true, this means that the tablesync worker + * is done with synchronization. Streaming has already been ended by + * process_syncing_tables_for_sync. We should move to the next table + * if needed, or exit. + */ + if (MyLogicalRepWorker->is_sync_completed) + endofstream = true; } /* Cleanup the memory. */ @@ -3745,8 +3768,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = errcallback.previous; apply_error_context_stack = error_context_stack; - /* All done */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* Tablesync workers should end streaming before exiting the main loop + * to drop replication slot. Only end streaming here for apply workers. + */ + if (!am_tablesync_worker()) + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } /* @@ -4517,6 +4543,8 @@ run_tablesync_worker(WalRcvStreamOptions *options, int originname_size, XLogRecPtr *origin_startpos) { + MyLogicalRepWorker->is_sync_completed = false; + /* Start table synchronization. */ start_table_sync(origin_startpos, &slotname); @@ -4697,10 +4725,11 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("%s for subscription \"%s\", table \"%s\" has started", + (errmsg("%s for subscription \"%s\", table \"%s\" with relid %u has started", get_worker_name(), MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); else ereport(LOG, /* translator: first %s is the name of logical replication worker */ @@ -4810,7 +4839,78 @@ TablesyncWorkerMain(Datum main_arg) invalidate_syncing_table_states, (Datum) 0); - run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); + /* + * The loop where worker does its job. It loops until there is no relation + * left to sync. + */ + for (;;) + { + List *rstates; + SubscriptionRelState *rstate; + ListCell *lc; + bool is_table_found = false; + + run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); + + if (IsTransactionState()) + CommitTransactionCommand(); + + if (MyLogicalRepWorker->is_sync_completed) + { + /* This transaction will be committed by clean_sync_worker. */ + StartTransactionCommand(); + + /* + * Check if there is any table whose relation state is still INIT. + * If a table in INIT state is found, the worker will not be finished, + * it will be reused instead. + */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); + + foreach(lc, rstates) + { + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + + /* + * Pick the table for the next run if it is not already picked up + * by another worker. + * + * Take exclusive lock to prevent any other sync worker from picking + * the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + if (rstate->state != SUBREL_STATE_SYNCDONE && + !logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + /* Update worker state for the next table */ + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + LWLockRelease(LogicalRepWorkerLock); + + /* found a table for next iteration */ + is_table_found = true; + clean_sync_worker(); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("%s for subscription \"%s\" has moved to sync table \"%s\" with relid %u.", + get_worker_name(), + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); + + break; + } + LWLockRelease(LogicalRepWorkerLock); + } + + if (!is_table_found) + break; + } + } finish_sync_worker(); } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7aba034774..1e9f8e6e72 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -56,6 +56,12 @@ typedef struct LogicalRepWorker XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * Indicates whether tablesync worker has completed sycning its assigned + * table. If true, no need to continue with that table. + */ + bool is_sync_completed; + /* * Used to create the changes and subxact files for the streaming * transactions. Upon the arrival of the first streaming transaction or @@ -308,6 +314,7 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid) extern void finish_sync_worker(void); +extern void clean_sync_worker(void); static inline bool am_tablesync_worker(void) -- 2.25.1