From 0c5bb1dacf97219970ea6c7802da5141393bcfa8 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Tue, 11 Apr 2023 14:24:22 +0300 Subject: [PATCH 1/3] Reuse Tablesync Workers This commit allows reusing tablesync workers for syncing more than one relation sequantially during their lifetime, instead of exiting after only syncing one relation. Before this commit, tablesync workers were capable of syncing only one relation. For each table, a new sync worker was launched and killed when the worker is done with the current table. Now, tablesync workers are not only limited with one relation and can move to another relation in the same subscription. This reduces the overhead of launching/killing a new background worker for each relation. A new tablesync worker gets launched only if the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. If there is a table needs to be synced, a tablesync worker picks that up and syncs it.The worker continues to picking new tables to sync until there is no table left for synchronization in the subscription. Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com --- src/backend/postmaster/bgworker.c | 3 + .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/launcher.c | 5 +- src/backend/replication/logical/tablesync.c | 48 +- src/backend/replication/logical/worker.c | 525 +++++++++++++----- src/include/replication/logicalworker.h | 1 + src/include/replication/worker_internal.h | 10 +- 7 files changed, 411 insertions(+), 183 deletions(-) diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 0dd22b2351..5609919edf 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,6 +131,9 @@ static const struct }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain + }, + { + "TablesyncWorkerMain", TablesyncWorkerMain } }; diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index ee7a18137f..50aa1386d5 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -940,7 +940,7 @@ ParallelApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = 0; - InitializeApplyWorker(); + InitializeLogRepWorker(); InitializingApplyWorker = false; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index ceea126231..3e89366e7c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -440,6 +440,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->ready_to_reuse = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -460,8 +461,10 @@ retry: if (is_parallel_apply_worker) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else + else if (!OidIsValid(relid)) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0c71ae9ba7..0a813bc371 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -127,40 +127,6 @@ static bool FetchTableStates(bool *started_tx); static StringInfo copybuf = NULL; -/* - * Exit routine for synchronization worker. - */ -static void -pg_attribute_noreturn() -finish_sync_worker(void) -{ - /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. - */ - if (IsTransactionState()) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } - - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); - - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - - /* Stop gracefully */ - proc_exit(0); -} - /* * Wait until the relation sync state is set in the catalog to the expected * one; return true when it happens. @@ -378,10 +344,16 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); - } - else + /* + * Sync worker is cleaned at this point. It's ready to sync next table, + * if needed. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->ready_to_reuse = true; SpinLockRelease(&MyLogicalRepWorker->relmutex); + } + + SpinLockRelease(&MyLogicalRepWorker->relmutex); } /* @@ -1275,7 +1247,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + sync_worker_exit(); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 879309b316..d5f87b84cb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -392,6 +392,7 @@ static void stream_open_file(Oid subid, TransactionId xid, static void stream_write_change(char action, StringInfo s); static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); +static void stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -3617,6 +3618,27 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can go into process_syncing_tables_for_sync early. + * Before we were able to reuse tablesync workers, that + * process_syncing_tables_for_sync call would exit the worker + * instead of preparing for reuse. Now that tablesync workers + * can be reused and process_syncing_tables_for_sync is not + * responsible for exiting. We need to take care of memory + * contexts here before moving to sync the nex table or exit. + */ + if (MyLogicalRepWorker->ready_to_reuse) + { + MemoryContextResetAndDeleteChildren(ApplyMessageContext); + MemoryContextSwitchTo(TopMemoryContext); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + apply_error_context_stack = error_context_stack; + return; + } + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -3636,6 +3658,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Process any table synchronization changes. */ process_syncing_tables(last_received); + if (MyLogicalRepWorker->ready_to_reuse) + { + endofstream = true; + } } /* Cleanup the memory. */ @@ -3734,12 +3760,15 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } } - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - apply_error_context_stack = error_context_stack; - - /* All done */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* + * If it's still not ready to reuse, this is probably an apply worker. + * End streaming before exiting. + */ + if (!MyLogicalRepWorker->ready_to_reuse) + { + /* All done */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + } } /* @@ -3865,6 +3894,50 @@ apply_worker_exit(void) proc_exit(0); } +/* + * Prepares the synchronization worker for reuse or exit. + */ +static void +clean_sync_worker(void) +{ + /* + * Commit any outstanding transaction. This is the usual case, unless + * there was nothing to do for the table. + */ + if (IsTransactionState()) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders + */ + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); +} + +/* + * Exit routine for synchronization worker. + */ +void +pg_attribute_noreturn() +sync_worker_exit(void) +{ + clean_sync_worker(); + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + /* Stop gracefully */ + proc_exit(0); +} + + /* * Reread subscription info if needed. Most changes will be exit. */ @@ -4324,6 +4397,56 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s) stream_stop_internal(xid); } + /* stream_build_options + * Build logical replication streaming options. + * + * This function sets streaming options including replication slot name + * and origin start position. Workers need these options for logical replication. + */ +static void +stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos) +{ + int server_version; + + options->logical = true; + options->startpoint = *origin_startpos; + options->slotname = slotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options->proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + + options->proto.logical.publication_names = MySubscription->publications; + options->proto.logical.binary = MySubscription->binary; + options->proto.logical.twophase = false; + options->proto.logical.origin = pstrdup(MySubscription->origin); + + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == LOGICALREP_STREAM_PARALLEL) + { + options->proto.logical.streaming_str = "parallel"; + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != LOGICALREP_STREAM_OFF) + { + options->proto.logical.streaming_str = "on"; + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options->proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } +} + /* * Cleanup the memory for subxacts and reset the related variables. */ @@ -4436,13 +4559,154 @@ start_apply(XLogRecPtr origin_startpos) } /* - * Common initialization for leader apply worker and parallel apply worker. + * Runs the tablesync worker. + * It starts table sync. After successful sync, + * builds streaming options and starts streaming. + */ +static void +run_tablesync_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + /* Set this to false for safety, in case we're already reusing the worker */ + MyLogicalRepWorker->ready_to_reuse = false; + + start_table_sync(origin_startpos, &slotname); + + /* + * Allocate the origin name in long-lived context for error context + * message. + */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + originname_size); + CommitTransactionCommand(); + + set_apply_error_context_origin(originname); + + stream_build_options(options, slotname, origin_startpos); + + /* Start normal logical streaming replication. */ + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + + /* Start applying changes to catcup. */ + start_apply(*origin_startpos); +} + +/* + * Runs the apply worker. + * It sets up replication origin, the streaming options + * and then starts streaming. + */ +static void +run_apply_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + /* This is the leader apply worker */ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + bool must_use_password; + + slotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard + * against DDL bugs or manual catalog changes. (libpqwalreceiver will + * crash if slot is NULL.) + */ + if (!slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, originname_size); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !superuser_arg(MySubscription->owner); + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + must_use_password, + MySubscription->name, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + set_apply_error_context_origin(originname); + + stream_build_options(options, slotname, origin_startpos); + + /* + * Even when the two_phase mode is requested by the user, it remains as + * the tri-state PENDING until all tablesyncs have reached READY state. + * Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + /* Start streaming with two_phase enabled */ + options->proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); + } + else + { + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + } + + ereport(DEBUG1, + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + + /* Run the main loop. */ + start_apply(*origin_startpos); +} + +/* + * Common initialization for logical replication workers; leader apply worker, + * parallel apply worker and tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ void -InitializeApplyWorker(void) +InitializeLogRepWorker(void) { MemoryContext oldctx; @@ -4506,9 +4770,10 @@ InitializeApplyWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has started", MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); else ereport(LOG, /* translator: first %s is the name of logical replication worker */ @@ -4527,7 +4792,6 @@ ApplyWorkerMain(Datum main_arg) XLogRecPtr origin_startpos = InvalidXLogRecPtr; char *myslotname = NULL; WalRcvStreamOptions options; - int server_version; InitializingApplyWorker = true; @@ -4551,7 +4815,7 @@ ApplyWorkerMain(Datum main_arg) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - InitializeApplyWorker(); + InitializeLogRepWorker(); InitializingApplyWorker = false; @@ -4559,165 +4823,142 @@ ApplyWorkerMain(Datum main_arg) elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) - { - start_table_sync(&origin_startpos, &myslotname); + /* + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. Do this outside the loop to avoid + * exceeding MAX_SYSCACHE_CALLBACKS + */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + invalidate_syncing_table_states, + (Datum) 0); - ReplicationOriginNameForLogicalRep(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - set_apply_error_context_origin(originname); - } - else - { - /* This is the leader apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - bool must_use_password; + /* This is leader apply worker */ + run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); - myslotname = MySubscription->slotname; + proc_exit(0); +} - /* - * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver will - * crash if slot is NULL.) - */ - if (!myslotname) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription has no replication slot set"))); +/* Logical Replication Tablesync worker entry point */ +void +TablesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *myslotname = NULL; + WalRcvStreamOptions options; + List *rstates; + SubscriptionRelState *rstate; + ListCell *lc; - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, sizeof(originname)); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - - /* Note that the superuser_arg call can access the DB */ - CommitTransactionCommand(); + elog(LOG, "logical replication table synchronization worker has started"); - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, - MySubscription->name, &err); - if (LogRepWorkerWalRcvConn == NULL) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); - /* - * We don't really use the output identify_system for anything but it - * does some initializations on the upstream so let's still call it. - */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); - set_apply_error_context_origin(originname); - } + /* + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ + + /* Initialise stats to a sanish value */ + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + InitializeLogRepWorker(); + + /* Connect to the origin and start the replication. */ + elog(DEBUG1, "connecting to publisher using connection string \"%s\"", + MySubscription->conninfo); /* * Setup callback for syscache so that we know when something changes in - * the subscription relation state. + * the subscription relation state. Do this outside the loop to avoid + * exceeding MAX_SYSCACHE_CALLBACKS */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); - /* Build logical replication streaming options. */ - options.logical = true; - options.startpoint = origin_startpos; - options.slotname = myslotname; - - server_version = walrcv_server_version(LogRepWorkerWalRcvConn); - options.proto.logical.proto_version = - server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : - server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : - server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : - LOGICALREP_PROTO_VERSION_NUM; - - options.proto.logical.publication_names = MySubscription->publications; - options.proto.logical.binary = MySubscription->binary; - /* - * Assign the appropriate option value for streaming option according to - * the 'streaming' mode and the publisher's ability to support that mode. + * The loop where worker does its job. It loops until there is no relation + * left to sync. */ - if (server_version >= 160000 && - MySubscription->stream == LOGICALREP_STREAM_PARALLEL) - { - options.proto.logical.streaming_str = "parallel"; - MyLogicalRepWorker->parallel_apply = true; - } - else if (server_version >= 140000 && - MySubscription->stream != LOGICALREP_STREAM_OFF) - { - options.proto.logical.streaming_str = "on"; - MyLogicalRepWorker->parallel_apply = false; - } - else + for (;;) { - options.proto.logical.streaming_str = NULL; - MyLogicalRepWorker->parallel_apply = false; - } + run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); - options.proto.logical.twophase = false; - options.proto.logical.origin = pstrdup(MySubscription->origin); + if (IsTransactionState()) + CommitTransactionCommand(); + + if (MyLogicalRepWorker->ready_to_reuse) + { + /* This transaction will be committed by clean_sync_worker. */ + StartTransactionCommand(); - if (!am_tablesync_worker()) - { /* - * Even when the two_phase mode is requested by the user, it remains - * as the tri-state PENDING until all tablesyncs have reached READY - * state. Only then, can it become ENABLED. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. + * Check if any table whose relation state is still INIT. If a table + * in INIT state is found, the worker will not be finished, it will be + * reused instead. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) + rstates = GetSubscriptionRelations(MySubscription->oid, true); + rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); + + foreach(lc, rstates) { - /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; - CommitTransactionCommand(); + /* + * Pick the table for the next run if it is not already picked up + * by another worker. + * + * Take exclusive lock to prevent any other sync worker from picking + * the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + if (rstate->state != SUBREL_STATE_SYNCDONE && + !logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + /* Update worker state for the next table */ + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + LWLockRelease(LogicalRepWorkerLock); + break; + } + LWLockRelease(LogicalRepWorkerLock); } + + /* + * If a relation with INIT state is assigned, clean up the worker for + * the next iteration. + * + * If there is no more work left for this worker, break the loop to + * exit. + */ + if ( MyLogicalRepWorker->relstate == SUBREL_STATE_INIT) + clean_sync_worker(); else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + break; - ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", - MySubscription->name, - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : - "?"))); - } - else - { - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + /* If not exited yet, then the worker will sync another table. */ + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has moved to sync table \"%s\" with relid %u.", + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid), MyLogicalRepWorker->relid))); + CommitTransactionCommand(); + } } - /* Run the main loop. */ - start_apply(origin_startpos); - - proc_exit(0); + sync_worker_exit(); } /* diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 39588da79f..bbd71d0b42 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); +extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b57eed052f..412195f143 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -76,6 +76,12 @@ typedef struct LogicalRepWorker /* Indicates whether apply can be performed in parallel. */ bool parallel_apply; + /* + * Used to indicate whether sync worker is ready for being reused + * to sync another relation. + */ + bool ready_to_reuse; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -265,7 +271,7 @@ extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); -extern void InitializeApplyWorker(void); +extern void InitializeLogRepWorker(void); extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); @@ -273,6 +279,8 @@ extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); extern void apply_error_callback(void *arg); extern void set_apply_error_context_origin(char *originname); +extern void sync_worker_exit(void); + /* Parallel apply worker setup and interactions */ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); -- 2.25.1