From 7b068ce75a2adf26aabf0fd52e7d09e0b9345ae1 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 4 Jul 2023 22:04:46 +0300 Subject: [PATCH v20 2/5] Reuse Tablesync Workers Before this patch, tablesync workers were capable of syncing only one table. For each table, a new sync worker was launched and that worker would exit when done processing the table. Now, tablesync workers are not limited to processing only one table. When done, they can move to processing another table in the same subscription. If there is a table that needs to be synced, an available tablesync worker picks up that table and syncs it. Each tablesync worker continues to pick new tables to sync until there are no tables left requiring synchronization. If there was no available worker to process the table, then a new tablesync worker will be launched, provided the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 133 +++++++++++++++++--- src/backend/replication/logical/worker.c | 36 +++++- src/include/replication/worker_internal.h | 6 + 4 files changed, 153 insertions(+), 23 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7f95..72e5ef8a78 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -440,6 +440,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->is_sync_completed = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a77d3e3032..d3bb482994 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -134,10 +134,11 @@ static StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. + * + * If reuse_worker is false, the worker will not be reused and exit. */ static void -pg_attribute_noreturn() -finish_sync_worker(void) +finish_sync_worker(bool reuse_worker) { /* * Commit any outstanding transaction. This is the usual case, unless @@ -149,21 +150,33 @@ finish_sync_worker(void) pgstat_report_stat(true); } + /* + * Disconnect from the publisher otherwise reusing the sync worker can + * error due to exceeding max_wal_senders. + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } + /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* Stop gracefully */ - proc_exit(0); + if (!reuse_worker) + { + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); + CommitTransactionCommand(); + + /* Stop gracefully */ + proc_exit(0); + } } /* @@ -383,7 +396,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + /* Sync worker has completed synchronization of the current table. */ + MyLogicalRepWorker->is_sync_completed = true; + + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -1288,7 +1309,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + finish_sync_worker(false); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1645,6 +1666,8 @@ run_tablesync_worker(WalRcvStreamOptions *options, int originname_size, XLogRecPtr *origin_startpos) { + MyLogicalRepWorker->is_sync_completed = false; + /* Start table synchronization. */ start_table_sync(origin_startpos, &slotname); @@ -1672,6 +1695,7 @@ TablesyncWorkerMain(Datum main_arg) XLogRecPtr origin_startpos = InvalidXLogRecPtr; char *myslotname = NULL; WalRcvStreamOptions options; + bool done = false; /* Attach to slot */ logicalrep_worker_attach(worker_slot); @@ -1707,13 +1731,84 @@ TablesyncWorkerMain(Datum main_arg) invalidate_syncing_table_states, (Datum) 0); - run_tablesync_worker(&options, - myslotname, - originname, - sizeof(originname), - &origin_startpos); + /* + * The loop where worker does its job. It loops until there is no relation + * left to sync. + */ + for (;!done;) + { + List *rstates; + ListCell *lc; + + run_tablesync_worker(&options, + myslotname, + originname, + sizeof(originname), + &origin_startpos); + + if (IsTransactionState()) + CommitTransactionCommand(); + + if (MyLogicalRepWorker->is_sync_completed) + { + /* tablesync is done unless a table that needs syncning is found */ + done = true; + + /* This transaction will be committed by finish_sync_worker. */ + StartTransactionCommand(); + + /* + * Check if there is any table whose relation state is still INIT. + * If a table in INIT state is found, the worker will not be + * finished, it will be reused instead. + */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + + if (rstate->state == SUBREL_STATE_SYNCDONE) + continue; + + /* + * Take exclusive lock to prevent any other sync worker from + * picking the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + /* + * Pick the table for the next run if it is not already picked + * up by another worker. + */ + if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + /* Update worker state for the next table */ + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + LWLockRelease(LogicalRepWorkerLock); + + /* Found a table for next iteration */ + finish_sync_worker(true); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); + + done = false; + break; + } + LWLockRelease(LogicalRepWorkerLock); + } + } + } - finish_sync_worker(); + finish_sync_worker(false); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5f42a5ef40..ed8b20eb34 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3607,6 +3607,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of + * the current table is completed. If it is completed, + * streaming must be already ended. So, we can break the loop. + */ + if (MyLogicalRepWorker->is_sync_completed) + { + endofstream = true; + break; + } + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -3626,6 +3640,15 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Process any table synchronization changes. */ process_syncing_tables(last_received); + + /* + * If is_sync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (MyLogicalRepWorker->is_sync_completed) + endofstream = true; } /* Cleanup the memory. */ @@ -3728,8 +3751,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = errcallback.previous; apply_error_context_stack = error_context_stack; - /* All done */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* + * End streaming here for only apply workers. Ending streaming for + * tablesync workers is deferred until the worker exits its main loop. + */ + if (!am_tablesync_worker()) + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } /* @@ -4603,9 +4630,10 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication worker for subscription \"%s\", table \"%s\" with relid %u has started", MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e7d03cb559..891b020aff 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -57,6 +57,12 @@ typedef struct LogicalRepWorker XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * Indicates whether tablesync worker has completed syncing its assigned + * table. + */ + bool is_sync_completed; + /* * Used to create the changes and subxact files for the streaming * transactions. Upon the arrival of the first streaming transaction or -- 2.25.1