From 5b2604f4303c72582bb2179c4d33370df5ffb0d6 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:04:46 +0300 Subject: [PATCH v18 2/3] Reuse Tablesync Workers Before this patch, tablesync workers were capable of syncing only one table. For each table, a new sync worker was launched and that worker would exit when done processing the table. Now, tablesync workers are not limited to processing only one table. When done, they can move to processing another table in the same subscription. If there is a table that needs to be synced, an available tablesync worker picks up that table and syncs it. Each tablesync worker continues to pick new tables to sync until there are no tables left requiring synchronization. If there was no available worker to process the table, then a new tablesync worker will be launched, provided the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 139 ++++++++++++++++---- src/backend/replication/logical/worker.c | 50 ++++++- src/include/replication/worker_internal.h | 7 + 4 files changed, 171 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7f95..72e5ef8a78 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -440,6 +440,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->is_sync_completed = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a77d3e3032..46e6f7ea10 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -137,11 +137,11 @@ static StringInfo copybuf = NULL; */ static void pg_attribute_noreturn() -finish_sync_worker(void) +finish_sync_worker(bool reuse_worker) { /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. + * Commit any outstanding transaction. This is the usual case, unless there + * was nothing to do for the table. */ if (IsTransactionState()) { @@ -149,21 +149,33 @@ finish_sync_worker(void) pgstat_report_stat(true); } - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); + /* + * Disconnect from the publisher otherwise reusing the sync worker can + * error due to exceeding max_wal_senders. + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* Stop gracefully */ - proc_exit(0); + if (!reuse_worker) + { + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); + CommitTransactionCommand(); + + /* Stop gracefully */ + proc_exit(0); + } } /* @@ -383,7 +395,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + /* Sync worker has completed synchronization of the current table. */ + MyLogicalRepWorker->is_sync_completed = true; + + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -1288,7 +1308,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + finish_sync_worker(true); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1645,6 +1665,8 @@ run_tablesync_worker(WalRcvStreamOptions *options, int originname_size, XLogRecPtr *origin_startpos) { + MyLogicalRepWorker->is_sync_completed = false; + /* Start table synchronization. */ start_table_sync(origin_startpos, &slotname); @@ -1707,13 +1729,86 @@ TablesyncWorkerMain(Datum main_arg) invalidate_syncing_table_states, (Datum) 0); - run_tablesync_worker(&options, - myslotname, - originname, - sizeof(originname), - &origin_startpos); + /* + * The loop where worker does its job. It loops until there is no relation + * left to sync. + */ + for (;;) + { + List *rstates; + ListCell *lc; + bool is_table_found = false; + + run_tablesync_worker(&options, + myslotname, + originname, + sizeof(originname), + &origin_startpos); + + if (IsTransactionState()) + CommitTransactionCommand(); + + if (MyLogicalRepWorker->is_sync_completed) + { + /* This transaction will be committed by finish_sync_worker. */ + StartTransactionCommand(); + + /* + * Check if there is any table whose relation state is still INIT. + * If a table in INIT state is found, the worker will not be + * finished, it will be reused instead. + */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + + if (rstate->state == SUBREL_STATE_SYNCDONE) + continue; + + /* + * Take exclusive lock to prevent any other sync worker from + * picking the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + /* + * Pick the table for the next run if it is not already picked + * up by another worker. + */ + if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + /* Update worker state for the next table */ + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + LWLockRelease(LogicalRepWorkerLock); + + /* Found a table for next iteration */ + is_table_found = true; + finish_sync_worker(true); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("%s for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", + get_worker_name(), + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); + + break; + } + LWLockRelease(LogicalRepWorkerLock); + } + + if (!is_table_found) + break; + } + } - finish_sync_worker(); + finish_sync_worker(false); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a9956834d0..5714028e84 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -433,6 +433,20 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +/* + * Return the name of the logical replication worker. + */ +const char * +get_worker_name(void) +{ + if (am_tablesync_worker()) + return _("logical replication table synchronization worker"); + else if (am_parallel_apply_worker()) + return _("logical replication parallel apply worker"); + else + return _("logical replication apply worker"); +} + /* * Form the origin name for the subscription. * @@ -3607,6 +3621,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of + * the current table is completed. If it is completed, + * streaming must be already ended. So, we can break the loop. + */ + if (MyLogicalRepWorker->is_sync_completed) + { + endofstream = true; + break; + } + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -3626,6 +3654,15 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Process any table synchronization changes. */ process_syncing_tables(last_received); + + /* + * If is_sync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (MyLogicalRepWorker->is_sync_completed) + endofstream = true; } /* Cleanup the memory. */ @@ -3728,8 +3765,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = errcallback.previous; apply_error_context_stack = error_context_stack; - /* All done */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* + * End streaming here for only apply workers. Ending streaming for + * tablesync workers is deferred until the worker exits its main loop. + */ + if (!am_tablesync_worker()) + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } /* @@ -4603,10 +4644,11 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("%s for subscription \"%s\", table \"%s\" has started", + (errmsg("%s for subscription \"%s\", table \"%s\" with relid %u has started", get_worker_name(), MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e7d03cb559..9c0237fe0b 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -57,6 +57,12 @@ typedef struct LogicalRepWorker XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * Indicates whether tablesync worker has completed sycning its assigned + * table. If true, no need to continue with that table. + */ + bool is_sync_completed; + /* * Used to create the changes and subxact files for the streaming * transactions. Upon the arrival of the first streaming transaction or @@ -333,5 +339,6 @@ extern void set_stream_options(WalRcvStreamOptions *options, XLogRecPtr *origin_startpos); extern void start_apply(XLogRecPtr origin_startpos); extern void DisableSubscriptionAndExit(void); +extern const char * get_worker_name(void); #endif /* WORKER_INTERNAL_H */ -- 2.25.1