From 09a9b666ee3df60b2662769e200643527dfc9435 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Thu, 2 Jun 2022 17:39:37 +0300 Subject: [PATCH] Reuse Logical Replication Background worker --- src/backend/catalog/pg_subscription.c | 59 ++++ src/backend/commands/subscriptioncmds.c | 164 ++++++---- src/backend/replication/logical/launcher.c | 3 + src/backend/replication/logical/tablesync.c | 118 +++++-- src/backend/replication/logical/worker.c | 338 ++++++++++++-------- src/include/catalog/pg_subscription_rel.h | 1 + src/include/replication/slot.h | 3 +- src/include/replication/worker_internal.h | 14 + 8 files changed, 488 insertions(+), 212 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8856ce3b50..81f8ab6cbf 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -635,3 +635,62 @@ GetSubscriptionNotReadyRelations(Oid subid) return res; } + +/* + * Get all relations for subscription that are in init state. + * + * Returned list is palloc'ed in current memory context. + */ +List * +GetSubscriptionInitStateRelations(Oid subid) +{ + List *res = NIL; + Relation rel; + HeapTuple tup; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_INIT)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, nkeys, skey); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + SubscriptionRelState *relstate; + Datum d; + bool isnull; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); + relstate->relid = subrel->srrelid; + relstate->state = subrel->srsubstate; + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + Anum_pg_subscription_rel_srsublsn, &isnull); + if (isnull) + relstate->lsn = InvalidXLogRecPtr; + else + relstate->lsn = DatumGetLSN(d); + + res = lappend(res, relstate); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return res; +} diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e2852286a7..34f4c0cb06 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -765,6 +765,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } SubRemoveRels; SubRemoveRels *sub_remove_rels; WalReceiverConn *wrconn; + List *sub_remove_slots = NIL; + LogicalRepWorker *worker; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -887,7 +889,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + /* + * Find the logical replication sync worker if exists + * Store the slot number for dropping associated replication slot later. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(sub->oid, relid, false); + if (worker) + { + logicalrep_worker_stop(sub->oid, relid); + sub_remove_slots = lappend(sub_remove_slots, &worker->slot); + } + LWLockRelease(LogicalRepWorkerLock); /* * For READY state, we would have already dropped the @@ -921,31 +934,27 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* - * Drop the tablesync slots associated with removed tables. This has - * to be at the end because otherwise if there is an error while doing + * Drop the replication slots associated with tablesync workers for removed tables. + * This has to be at the end because otherwise if there is an error while doing * the database operations we won't be able to rollback dropped slots. */ - for (off = 0; off < remove_rel_len; off++) + foreach(lc, sub_remove_slots) { - if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) - { - char syncslotname[NAMEDATALEN] = {0}; + char syncslotname[NAMEDATALEN] = {0}; - /* - * For READY/SYNCDONE states we know the tablesync slot has - * already been dropped by the tablesync worker. - * - * For other states, there is no certainty, maybe the slot - * does not exist yet. Also, if we fail after removing some of - * the slots, next time, it will again try to drop already - * dropped slots and fail. For these reasons, we allow - * missing_ok = true for the drop. - */ - ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, - syncslotname, sizeof(syncslotname)); - ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); - } + int *slot_to_drop = (int *) palloc(sizeof(int)); + memcpy(slot_to_drop, lfirst(lc), sizeof(int)); + + /* + * There is no certainty, maybe the slot + * does not exist yet. Also, if we fail after removing some of + * the slots, next time, it will again try to drop already + * dropped slots and fail. For these reasons, we allow + * missing_ok = true for the drop. + */ + ReplicationSlotNameForTablesync(sub->oid, *slot_to_drop, + syncslotname, sizeof(syncslotname)); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } PG_FINALLY(); @@ -1530,39 +1539,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { - foreach(lc, rstates) - { - SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); - Oid relid = rstate->relid; - - /* Only cleanup resources of tablesync workers */ - if (!OidIsValid(relid)) - continue; - - /* - * Drop the tablesync slots associated with removed tables. - * - * For SYNCDONE/READY states, the tablesync slot is known to have - * already been dropped by the tablesync worker. - * - * For other states, there is no certainty, maybe the slot does - * not exist yet. Also, if we fail after removing some of the - * slots, next time, it will again try to drop already dropped - * slots and fail. For these reasons, we allow missing_ok = true - * for the drop. - */ - if (rstate->state != SUBREL_STATE_SYNCDONE) - { - char syncslotname[NAMEDATALEN] = {0}; + List *slots = NULL; - ReplicationSlotNameForTablesync(subid, relid, syncslotname, - sizeof(syncslotname)); - ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); - } + + slots = GetReplicationSlotNamesBySubId(wrconn, subid, true); + foreach(lc, slots) + { + char *syncslotname = (char *) lfirst(lc); + ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } - list_free(rstates); - /* * If there is a slot associated with the subscription, then drop the * replication slot at the publisher. @@ -1591,6 +1577,69 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) table_close(rel, NoLock); } +/* + * GetReplicationSlotNamesBySubId + * + * WRITE COMMENT HERE + */ +List * +GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok){ + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {NAMEOID}; + List *tablelist = NIL; + + Assert(wrconn); + + load_file("libpqwalreceiver", false); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT slot_name" + " FROM pg_replication_slots" + " WHERE slot_name LIKE 'pg_%i_sync_%%';", + subid); + PG_TRY(); + { + WalRcvExecResult *res; + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + + if (res->status != WALRCV_OK_TUPLES) + { + ereport(ERROR, + errmsg("not tuple returned.")); + } + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *repslotname; + char *slotattr; + bool isnull; + + slotattr = NameStr(*DatumGetName(slot_getattr(slot, 1, &isnull))); + Assert(!isnull); + + repslotname = palloc(sizeof(char) * strlen(slotattr) + 1); + memcpy(repslotname, slotattr, sizeof(char) * strlen(slotattr)); + repslotname[strlen(slotattr)] = '\0'; + tablelist = lappend(tablelist, repslotname); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + PG_FINALLY(); + { + pfree(cmd.data); + } + PG_END_TRY();\ + return tablelist; +} + /* * Drop the replication slot at the publisher node using the replication * connection. @@ -1832,6 +1881,7 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) { ListCell *lc; + LogicalRepWorker *worker; foreach(lc, rstates) { @@ -1842,15 +1892,21 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) if (!OidIsValid(relid)) continue; + /* Check if there is a sync worker for the relation */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(subid, relid, false); + LWLockRelease(LogicalRepWorkerLock); + /* * Caller needs to ensure that relstate doesn't change underneath us. * See DropSubscription where we get the relstates. */ - if (rstate->state != SUBREL_STATE_SYNCDONE) + if (worker && + rstate->state != SUBREL_STATE_SYNCDONE) { char syncslotname[NAMEDATALEN] = {0}; - ReplicationSlotNameForTablesync(subid, relid, syncslotname, + ReplicationSlotNameForTablesync(subid, worker->slot, syncslotname, sizeof(syncslotname)); elog(WARNING, "could not drop tablesync replication slot \"%s\"", syncslotname); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 2bdab53e19..918d8137c0 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -370,7 +370,9 @@ retry: /* Prepare the worker slot. */ worker->launch_time = now; worker->in_use = true; + worker->is_first_run = true; worker->generation++; + worker->slot = slot; worker->proc = NULL; worker->dbid = dbid; worker->userid = userid; @@ -378,6 +380,7 @@ retry: worker->relid = relid; worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; + worker->move_to_next_rel = false; worker->stream_fileset = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 670c6fcada..c82a203fc5 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -126,12 +126,8 @@ static bool FetchTableStates(bool *started_tx); static StringInfo copybuf = NULL; -/* - * Exit routine for synchronization worker. - */ static void -pg_attribute_noreturn() -finish_sync_worker(void) +clean_sync_worker(void) { /* * Commit any outstanding transaction. This is the usual case, unless @@ -143,18 +139,27 @@ finish_sync_worker(void) pgstat_report_stat(true); } - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); + /* Disconnect from publisher. + * Otherwise reused sync workers causes exceeding max_wal_senders + */ + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; /* Find the main apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); +} + +/* + * Exit routine for synchronization worker. + */ +static void +pg_attribute_noreturn() +finish_sync_worker(void) +{ + clean_sync_worker(); + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); /* Stop gracefully */ proc_exit(0); @@ -180,7 +185,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) LogicalRepWorker *worker; XLogRecPtr statelsn; - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); InvalidateCatalogSnapshot(); state = GetSubscriptionRelState(MyLogicalRepWorker->subid, @@ -284,6 +289,10 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { + List *rstates; + SubscriptionRelState *rstate; + ListCell *lc; + SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && @@ -323,18 +332,64 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * able to rollback dropped slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->slot, syncslotname, sizeof(syncslotname)); /* - * It is important to give an error if we are unable to drop the slot, - * otherwise, it won't be dropped till the corresponding subscription - * is dropped. So passing missing_ok = false. + * Check if any table whose relation state is still INIT. + * If a table in INIT state is found, the worker will not be finished, + * it will be reused instead. */ - ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); + rstates = GetSubscriptionInitStateRelations(MySubscription->oid); + + foreach (lc, rstates) + { + rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - finish_sync_worker(); + /* + * Pick the table for the next run + * if there is not another worker already picked that table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + + /* Update worker state for the next table */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->is_first_run = false; + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + MyLogicalRepWorker->move_to_next_rel = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + LWLockRelease(LogicalRepWorkerLock); + break; + } + LWLockRelease(LogicalRepWorkerLock); + } + + /* Cleanup before next run or ending the worker. */ + if(!MyLogicalRepWorker->move_to_next_rel) + { + /* + * It is important to give an error if we are unable to drop the slot, + * otherwise, it won't be dropped till the corresponding subscription + * is dropped. So passing missing_ok = false. + */ + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); + + finish_sync_worker(); + } + else + { + clean_sync_worker(); + } } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -1152,11 +1207,11 @@ copy_table(Relation rel) * had changed. */ void -ReplicationSlotNameForTablesync(Oid suboid, Oid relid, +ReplicationSlotNameForTablesync(Oid suboid, int slot, char *syncslotname, int szslot) { - snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, - relid, GetSystemIdentifier()); + snprintf(syncslotname, szslot, "pg_%u_sync_%i_" UINT64_FORMAT, suboid, + slot, GetSystemIdentifier()); } /* @@ -1219,7 +1274,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Calculate the name of the tablesync slot. */ slotname = (char *) palloc(NAMEDATALEN); ReplicationSlotNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->slot, slotname, NAMEDATALEN); @@ -1356,11 +1411,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * drop subscription happens which would complete without removing this * slot leading to a dangling slot on the server. */ - HOLD_INTERRUPTS(); - walrcv_create_slot(LogRepWorkerWalRcvConn, - slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); - RESUME_INTERRUPTS(); + if (MyLogicalRepWorker->is_first_run) + { + HOLD_INTERRUPTS(); + walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , + CRS_USE_SNAPSHOT, origin_startpos); + RESUME_INTERRUPTS(); + } /* * Setup replication origin tracking. The purpose of doing this before the diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 38e3b1c1b3..31ed8ed3d0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -315,6 +315,7 @@ static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first); static void stream_write_change(char action, StringInfo s); static void stream_close_file(void); +static void stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -2814,6 +2815,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Process any table synchronization changes. */ process_syncing_tables(last_received); + if (MyLogicalRepWorker->move_to_next_rel) + { + endofstream = true; + } } /* Cleanup the memory. */ @@ -2915,8 +2920,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Pop the error context stack */ error_context_stack = errcallback.previous; - /* All done */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* + * If it's moving to next relation, this is a sync worker. + * Sync workers end the streaming during process_syncing_tables_for_sync. + * Calling endstreaming twice causes "no COPY in progress" errors. + */ + if (!MyLogicalRepWorker->move_to_next_rel) + { + /* All done */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + } } /* @@ -3457,6 +3470,34 @@ stream_write_change(char action, StringInfo s) BufFileWrite(stream_fd, &s->data[s->cursor], len); } +/* + * stream_build_options_replication + * Build logical replication streaming options. + * + * This function sets streaming options including replication slot name + * and origin start position. Workers need these options for logical replication. + */ +static void +stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos) +{ + int server_version; + + options->logical = true; + options->startpoint = *origin_startpos; + options->slotname = slotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options->proto.logical.proto_version = + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + + options->proto.logical.publication_names = MySubscription->publications; + options->proto.logical.binary = MySubscription->binary; + options->proto.logical.streaming = MySubscription->stream; + options->proto.logical.twophase = false; +} + /* * Cleanup the memory for subxacts and reset the related variables. */ @@ -3568,6 +3609,136 @@ start_apply(XLogRecPtr origin_startpos) PG_END_TRY(); } +/* + * Runs the tablesync worker. + * It starts table sync. After successful sync, + * builds streaming options and starts streaming. + */ +static void +run_tablesync_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originame_size, + XLogRecPtr *origin_startpos) +{ + /* Set this to false for safety, in case we're already reusing the worker */ + MyLogicalRepWorker->move_to_next_rel = false; + + start_table_sync(origin_startpos, &slotname); + + /* + * Allocate the origin name in long-lived context for error context + * message. + */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + originame_size); + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); + + stream_build_options(options, slotname, origin_startpos); + + /* Start normal logical streaming replication. */ + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); +} + +/* + * Runs the apply worker. + * It sets up replication origin, the streaming options + * and then starts streaming. + */ +static void +run_apply_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + + slotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard + * against DDL bugs or manual catalog changes. (libpqwalreceiver will + * crash if slot is NULL.) + */ + if (!slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + snprintf(originname, originname_size, "pg_%u", MySubscription->oid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + CommitTransactionCommand(); + + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + /* + * Allocate the origin name in long-lived context for error context + * message. + */ + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); + + stream_build_options(options, slotname, origin_startpos); + + /* + * Even when the two_phase mode is requested by the user, it remains + * as the tri-state PENDING until all tablesyncs have reached READY + * state. Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + /* Start streaming with two_phase enabled */ + options->proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); + } + else + { + walrcv_startstreaming(LogRepWorkerWalRcvConn, options); + } + + ereport(DEBUG1, + (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3578,7 +3749,6 @@ ApplyWorkerMain(Datum main_arg) XLogRecPtr origin_startpos = InvalidXLogRecPtr; char *myslotname = NULL; WalRcvStreamOptions options; - int server_version; /* Attach to slot */ logicalrep_worker_attach(worker_slot); @@ -3669,141 +3839,55 @@ ApplyWorkerMain(Datum main_arg) elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) - { - start_table_sync(&origin_startpos, &myslotname); - - /* - * Allocate the origin name in long-lived context for error context - * message. - */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, - originname); - } - else - { - /* This is main apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - - myslotname = MySubscription->slotname; - - /* - * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver will - * crash if slot is NULL.) - */ - if (!myslotname) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription has no replication slot set"))); - - /* Setup replication origin tracking. */ - StartTransactionCommand(); - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - CommitTransactionCommand(); - - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - MySubscription->name, &err); - if (LogRepWorkerWalRcvConn == NULL) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); - - /* - * We don't really use the output identify_system for anything but it - * does some initializations on the upstream so let's still call it. - */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); - - /* - * Allocate the origin name in long-lived context for error context - * message. - */ - apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, - originname); - } - /* - * Setup callback for syscache so that we know when something changes in - * the subscription relation state. - */ + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. + * Do this outside the loop to avoid exceeding MAX_SYSCACHE_CALLBACKS + */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, - invalidate_syncing_table_states, - (Datum) 0); - - /* Build logical replication streaming options. */ - options.logical = true; - options.startpoint = origin_startpos; - options.slotname = myslotname; + invalidate_syncing_table_states, + (Datum) 0); - server_version = walrcv_server_version(LogRepWorkerWalRcvConn); - options.proto.logical.proto_version = - server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : - server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : - LOGICALREP_PROTO_VERSION_NUM; + /* + * The loop where worker does its job. + * It loops until the worker is not reused. + */ + while (MyLogicalRepWorker->is_first_run || + MyLogicalRepWorker->move_to_next_rel) + { + if (am_tablesync_worker()) + { + /* + * This is a tablesync worker. + * Start syncing tables before starting the apply loop. + */ + run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); + } + else + { + /* This is main apply worker */ + run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos); + } - options.proto.logical.publication_names = MySubscription->publications; - options.proto.logical.binary = MySubscription->binary; - options.proto.logical.streaming = MySubscription->stream; - options.proto.logical.twophase = false; + /* Run the main loop. */ + start_apply(origin_startpos); - if (!am_tablesync_worker()) - { - /* - * Even when the two_phase mode is requested by the user, it remains - * as the tri-state PENDING until all tablesyncs have reached READY - * state. Only then, can it become ENABLED. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) + if (MyLogicalRepWorker->move_to_next_rel) { - /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + /* Reset the currenct replication origin session. + * Since we'll use the same process for another relation, it needs to be reset + * and will be created again later while syncing the new relation. + */ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_reset(); StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has moved to sync table \"%s\".", + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); } - else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } - - ereport(DEBUG1, - (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s", - MySubscription->name, - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : - "?"))); } - else - { - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } - - /* Run the main loop. */ - start_apply(origin_startpos); - proc_exit(0); } diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9df99c3418..21a773ad56 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -90,5 +90,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid); extern List *GetSubscriptionNotReadyRelations(Oid subid); +extern List *GetSubscriptionInitStateRelations(Oid subid); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8c9f3321d5..57c4215cfe 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -217,8 +217,9 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); -extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); +extern void ReplicationSlotNameForTablesync(Oid suboid, int slot, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); +extern List* GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 901845abc2..db4e96be80 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,9 +30,18 @@ typedef struct LogicalRepWorker /* Indicates if this slot is used or free. */ bool in_use; + /* + * Indicates if worker is running for the first time + * or in reuse + */ + bool is_first_run; + /* Increased every time the slot is taken by new worker. */ uint16 generation; + /* The slot that this worker is using */ + int slot; + /* Pointer to proc array. NULL if not running. */ PGPROC *proc; @@ -51,6 +60,11 @@ typedef struct LogicalRepWorker XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * Used to indicate whether sync worker will be reused for another relation + */ + bool move_to_next_rel; + /* * Used to create the changes and subxact files for the streaming * transactions. Upon the arrival of the first streaming transaction, the -- 2.25.1