From ad2c50f99fe504a57e9480a7c52a307ecca78f4c Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 23 Jul 2025 13:38:12 +0800 Subject: [PATCH v63] Introduce a 'max_conflict_retention_duration' option to subscriptions. This commit introduces a subscription option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_dead_tuples enabled is present and the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection.xmin will be set to InvalidTransactionId if all apply workers associated with the subscription, where retain_dead_tuples is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. Additionally, retention status is recorded in the pg_subscription catalog (subretentionactive) to prevent unnecessary retention initiation upon server restarts. In this patch, a replication slot will not be automatically re-initialized. Users can disable retain_dead_tuples and re-enable it manually to resume the retention. An upcoming patch will include support for automatic slot re-initialization once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. To monitor worker's conflict retention status, this patch also introduces a new column 'dead_tuple_retention_active' in the pg_stat_subscription view. This column indicates whether the apply worker is effectively retaining conflict information. The value is set to true only if retain_dead_tuples is enabled for the associated subscription, and the retention duration for conflict detection by the apply worker has not exceeded max_conflict_retention_duration. --- doc/src/sgml/catalogs.sgml | 25 +++ doc/src/sgml/monitoring.sgml | 12 ++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 42 +++- src/backend/catalog/pg_subscription.c | 41 ++++ src/backend/catalog/system_views.sql | 7 +- src/backend/commands/subscriptioncmds.c | 88 +++++++- src/backend/replication/logical/launcher.c | 53 +++-- src/backend/replication/logical/tablesync.c | 22 +- src/backend/replication/logical/worker.c | 225 ++++++++++++++++++-- src/bin/pg_dump/pg_dump.c | 18 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 12 +- src/bin/psql/tab-complete.in.c | 6 +- src/include/catalog/pg_proc.dat | 6 +- src/include/catalog/pg_subscription.h | 19 ++ src/include/catalog/pg_subscription_rel.h | 2 + src/include/replication/worker_internal.h | 10 +- src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/subscription.out | 186 +++++++++------- src/test/regress/sql/subscription.sql | 16 ++ src/test/subscription/t/035_conflicts.pl | 10 +- 22 files changed, 672 insertions(+), 139 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index da8a7882580..98bfa96b17b 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8094,6 +8094,31 @@ SCRAM-SHA-256$<iteration count>:&l + + + submaxconflretention int4 + + + The maximum duration (in milliseconds) for which information (e.g., dead + tuples, commit timestamps, and origins) useful for conflict detection can + be retained. + + + + + + subretentionactive bool + + + The retention status of information (e.g., dead tuples, commit + timestamps, and origins) useful for conflict detection. True if + retain_dead_tuples + is enabled, and the retention duration has not exceeded + max_conflict_retention_duration, + when defined. + + + subconninfo text diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3f4a27a736e..5f93b918274 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2114,6 +2114,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage sender; NULL for parallel apply workers + + + + dead_tuple_retention_active boolean + + + True if retain_dead_tuples + is enabled and the retention duration for information used in conflict detection is + within max_conflict_retention_duration; NULL for + parallel apply workers and table synchronization workers. + + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index d48cdc76bd3..f2c2e147472 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -236,8 +236,9 @@ ALTER SUBSCRIPTION name RENAME TO < run_as_owner, origin, failover, - two_phase, and - retain_dead_tuples. + two_phase, + retain_dead_tuples, and + max_conflict_retention_duration. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 247c5bd2604..52822b2fbb5 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -448,7 +448,7 @@ CREATE SUBSCRIPTION subscription_nametrue, the detection of is enabled, and a physical replication slot named pg_conflict_detection - created on the subscriber to prevent the information for detecting + is created on the subscriber to prevent the information for detecting conflicts from being removed. @@ -521,6 +521,46 @@ CREATE SUBSCRIPTION subscription_name + + + max_conflict_retention_duration (integer) + + + Maximum duration for which this subscription's apply worker is allowed + to retain the information useful for conflict detection when + retain_dead_tuples is enabled for the associated + subscriptions. The default value is 0, indicating + that the information is retained until it is no longer needed for + detection purposes. This value is taken as milliseconds. + + + The information useful for conflict detection is no longer retained if + all apply workers associated with the subscriptions, where + retain_dead_tuples is enabled, confirm that the + retention duration exceeded the + max_conflict_retention_duration set within the + corresponding subscription. To re-enable retention manually, you can + disable retain_dead_tuples and re-enable it. + + + Note that overall retention will not stop if other subscriptions + specify a greater value and have not exceeded it, or if they set this + option to 0. + + + This option is effective only when + retain_conflict_info is enabled and the apply + worker associated with the subscription is active. + + + + Note that setting a non-zero value for this option could lead to + information for conflict detection being removed prematurely, + potentially missing some conflict detections. + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 244acf52f36..16eb5c16a0b 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,8 @@ GetSubscription(Oid subid, bool missing_ok) sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; sub->retaindeadtuples = subform->subretaindeadtuples; + sub->maxconflretention = subform->submaxconflretention; + sub->retentionactive = subform->subretentionactive; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, @@ -598,3 +600,42 @@ GetSubscriptionRelations(Oid subid, bool not_ready) return res; } + +/* + * Update the dead tuple retention status for the given subscription. + */ +void +UpdateDeadTupleRetentionStatus(Oid subid, bool active) +{ + Relation rel; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", subid); + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set the subscription to disabled. */ + values[Anum_pg_subscription_subretentionactive - 1] = active; + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + + /* Update the catalog */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + heap_freetuple(tup); + + table_close(rel, NoLock); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 1b3c5a55882..ccc78d81839 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -996,7 +996,8 @@ CREATE VIEW pg_stat_subscription AS st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.dead_tuple_retention_active FROM pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL) st ON (st.subid = su.oid); @@ -1389,8 +1390,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subretaindeadtuples, subslotname, subsynccommit, - subpublications, suborigin) + subretaindeadtuples, submaxconflretention, subretentionactive, + subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index faa3650d287..5b97f1d4ba2 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -73,8 +73,9 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 -#define SUBOPT_LSN 0x00008000 -#define SUBOPT_ORIGIN 0x00010000 +#define SUBOPT_MAX_CONFLICT_RETENTION_DURATION 0x00008000 +#define SUBOPT_LSN 0x00010000 +#define SUBOPT_ORIGIN 0x00020000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -101,6 +102,7 @@ typedef struct SubOpts bool runasowner; bool failover; bool retaindeadtuples; + int32 maxconflretention; char *origin; XLogRecPtr lsn; } SubOpts; @@ -112,6 +114,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, Oid *subrel_local_oids, int subrel_count, char *subname); static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); +static void notify_ineffective_max_conflict_retention(bool update_maxconflretention); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -169,6 +172,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) opts->retaindeadtuples = false; + if (IsSet(supported_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + opts->maxconflretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -323,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; opts->retaindeadtuples = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION) && + strcmp(defel->defname, "max_conflict_retention_duration") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MAX_CONFLICT_RETENTION_DURATION; + opts->maxconflretention = defGetInt32(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -580,7 +594,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_CONFLICT_RETENTION_DURATION | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -651,6 +666,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.retaindeadtuples) CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + /* Notify that max_conflict_retention_duration is ineffective */ + else if (opts.maxconflretention) + notify_ineffective_max_conflict_retention(true); + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) opts.slot_name = stmt->subname; @@ -693,6 +712,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subretaindeadtuples - 1] = BoolGetDatum(opts.retaindeadtuples); + values[Anum_pg_subscription_submaxconflretention - 1] = + Int32GetDatum(opts.maxconflretention); + values[Anum_pg_subscription_subretentionactive - 1] = + Int32GetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1175,6 +1198,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_failover = false; bool update_two_phase = false; bool check_pub_rdt = false; + bool ineffective_maxconflretention = false; + bool update_maxconflretention = false; bool retain_dead_tuples; char *origin; Subscription *sub; @@ -1235,7 +1260,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_CONFLICT_RETENTION_DURATION | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1401,6 +1428,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, BoolGetDatum(opts.retaindeadtuples); replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + /* + * Update the retention status only when there is a change + * in the retain_dead_tuples option value. + * + * It might not be ideal to blindly mark retention as active + * upon enabling the retain_dead_tuples, when retention was + * previously ceased and the user toggles retain_dead_tuples + * without adjusting the publisher workload. However, since + * retention will be stopped gain soon in such cases, and + * this approach offers a convenient way for the user to + * manually refresh the retention status, it is suitable for + * now. + */ + if (opts.retaindeadtuples != sub->retaindeadtuples) + { + values[Anum_pg_subscription_subretentionactive - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + } + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); /* @@ -1434,6 +1481,20 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, check_pub_rdt = opts.retaindeadtuples; retain_dead_tuples = opts.retaindeadtuples; + + ineffective_maxconflretention = (!opts.retaindeadtuples && + sub->maxconflretention); + } + + if (IsSet(opts.specified_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + { + values[Anum_pg_subscription_submaxconflretention - 1] = + Int32GetDatum(opts.maxconflretention); + replaces[Anum_pg_subscription_submaxconflretention - 1] = true; + + update_maxconflretention = true; + ineffective_maxconflretention = (!retain_dead_tuples && + opts.maxconflretention); } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) @@ -1453,6 +1514,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, origin = opts.origin; } + if (ineffective_maxconflretention) + notify_ineffective_max_conflict_retention(update_maxconflretention); + update_tuple = true; break; } @@ -2500,6 +2564,22 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, "retain_dead_tuples") : 0); } +/* + * Report a NOTICE to inform users that max_conflict_retention_duration is + * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR + * is not issued because setting max_conflict_retention_duration causes no harm, + * even when it is ineffective. + */ +static void +notify_ineffective_max_conflict_retention(bool update_maxconflretention) +{ + ereport(NOTICE, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + update_maxconflretention + ? errmsg("max_conflict_retention_duration has no effect when retain_dead_tuples is disabled") + : errmsg("disabling retain_dead_tuples will render max_conflict_retention_duration ineffective")); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..20e910e5156 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,6 +23,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -43,6 +44,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -102,7 +104,7 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); /* @@ -152,6 +154,7 @@ get_subscription_list(void) sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); sub->retaindeadtuples = subform->subretaindeadtuples; + sub->retentionactive = subform->subretentionactive; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -998,7 +1001,7 @@ ApplyLauncherShmemInit(void) LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; memset(worker, 0, sizeof(LogicalRepWorker)); - SpinLockInit(&worker->relmutex); + SpinLockInit(&worker->mutex); } } } @@ -1183,6 +1186,7 @@ ApplyLauncherMain(Datum main_arg) long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; bool retain_dead_tuples = false; + bool retention_inactive = false; TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1225,6 +1229,13 @@ ApplyLauncherMain(Datum main_arg) */ can_advance_xmin &= sub->enabled; + /* + * Consider overall retention inactive only when all + * subscriptions with retain_dead_tuples enabled have marked + * it as inactive. + */ + retention_inactive &= !sub->retentionactive; + /* * Create a replication slot to retain information necessary * for conflict detection such as dead tuples, commit @@ -1256,7 +1267,8 @@ ApplyLauncherMain(Datum main_arg) * required for conflict detection among all running apply * workers that enables retain_dead_tuples. */ - if (sub->retaindeadtuples && can_advance_xmin) + if (sub->retaindeadtuples && sub->retentionactive && + can_advance_xmin) compute_min_nonremovable_xid(w, &xmin); /* worker is running already */ @@ -1320,13 +1332,19 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * However, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to cease retention, + * marking it as inactive, the new xmin will be set to + * InvalidTransactionId. We then update slot.xmin accordingly to + * permit the removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); - else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + else if (can_advance_xmin || retention_inactive) + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1374,9 +1392,9 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) */ Assert(MyReplicationSlot); - SpinLockAcquire(&worker->relmutex); + SpinLockAcquire(&worker->mutex); nonremovable_xid = worker->oldest_nonremovable_xid; - SpinLockRelease(&worker->relmutex); + SpinLockRelease(&worker->mutex); Assert(TransactionIdIsValid(nonremovable_xid)); @@ -1402,17 +1420,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; @@ -1518,7 +1536,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_COLS 11 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1595,6 +1613,15 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) elog(ERROR, "unknown worker type"); } + /* + * Only the leader apply worker manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker)) + values[10] = TransactionIdIsValid(worker.oldest_nonremovable_xid); + else + nulls[10] = true; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3356bc84ee..1ab5496f63f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -293,7 +293,7 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) @@ -305,7 +305,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * UpdateSubscriptionRelState must be called within a transaction. @@ -390,7 +390,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) finish_sync_worker(); } else - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); } /* @@ -534,7 +534,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (syncworker) { /* Found one, update our copy of its state */ - SpinLockAcquire(&syncworker->relmutex); + SpinLockAcquire(&syncworker->mutex); rstate->state = syncworker->relstate; rstate->lsn = syncworker->relstate_lsn; if (rstate->state == SUBREL_STATE_SYNCWAIT) @@ -547,7 +547,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn); } - SpinLockRelease(&syncworker->relmutex); + SpinLockRelease(&syncworker->mutex); /* If we told worker to catch up, wait for it. */ if (rstate->state == SUBREL_STATE_SYNCWAIT) @@ -1342,10 +1342,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = relstate; MyLogicalRepWorker->relstate_lsn = relstate_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * If synchronization is already done or no longer necessary, exit now @@ -1428,10 +1428,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) goto copy_table_done; } - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* Update the state and make it visible to others. */ StartTransactionCommand(); @@ -1586,10 +1586,10 @@ copy_table_done: /* * We are done with the initial data synchronization, update the state. */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; MyLogicalRepWorker->relstate_lsn = *origin_startpos; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * Finally, wait until the leader apply worker tells us to catch up and diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8e343873454..1b2d6a3e001 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,10 +173,15 @@ * Advance the non-removable transaction ID if the current flush location has * reached or surpassed the last received WAL position. * + * - RDT_STOP_CONFLICT_INFO_RETENTION: + * Update pg_subscription.subretentionactive to false within a new + * transaction, and set oldest_nonremovable_xid to InvalidTransactionId. + * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> - * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * WAIT_FOR_LOCAL_FLUSH -> (RDT_STOP_CONFLICT_INFO_RETENTION if the wait time + * exceeds max_conflict_retention_duration) -> loop back to GET_CANDIDATE_XID. * * Retaining the dead tuples for this period is sufficient for ensuring * eventual consistency using last-update-wins strategy, as dead tuples are @@ -373,7 +378,8 @@ typedef enum RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS, - RDT_WAIT_FOR_LOCAL_FLUSH + RDT_WAIT_FOR_LOCAL_FLUSH, + RDT_STOP_CONFLICT_INFO_RETENTION, } RetainDeadTuplesPhase; /* @@ -415,6 +421,9 @@ typedef struct RetainDeadTuplesData * updated in final phase * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + long table_sync_wait_time; /* time spent waiting for table sync + * to finish */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -555,6 +564,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3220,6 +3232,7 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, { TransactionId oldestxmin; ReplicationSlot *slot; + bool stop_retention; /* * Return false if either dead tuples are not retained or commit timestamp @@ -3228,6 +3241,42 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, if (!MySubscription->retaindeadtuples || !track_commit_timestamp) return false; + /* + * Check whether the leader apply worker has stopped retaining information + * for detecting conflicts. + */ + if (am_leader_apply_worker()) + { + stop_retention = + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid); + } + else + { + LogicalRepWorker *leader; + + /* + * Obtain the information from the leader apply worker as only the + * leader manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + leader = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + + SpinLockAcquire(&leader->mutex); + stop_retention = !TransactionIdIsValid(leader->oldest_nonremovable_xid); + SpinLockRelease(&leader->mutex); + LWLockRelease(LogicalRepWorkerLock); + } + + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (stop_retention) + return false; + /* * For conflict detection, we use the conflict slot's xmin value instead * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as @@ -3254,7 +3303,15 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, oldestxmin = slot->data.xmin; SpinLockRelease(&slot->mutex); - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the conflict detection slot.xmin is set to + * InvalidTransactionId. This situation arises if the current worker is + * either a table synchronization or parallel apply worker, and the leader + * stopped retention immediately after checking the + * oldest_nonremovable_xid above. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; if (OidIsValid(localidxoid) && IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) @@ -4110,7 +4167,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Ensure to wake up when it's possible to advance the non-removable * transaction ID. */ - if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid) && + rdt_data.phase == RDT_GET_CANDIDATE_XID && rdt_data.xid_advance_interval) wait_time = Min(wait_time, rdt_data.xid_advance_interval); @@ -4325,6 +4383,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; + /* No need to advance if we have already stopped retaining */ + if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + return false; + return true; } @@ -4350,6 +4412,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_WAIT_FOR_LOCAL_FLUSH: wait_for_local_flush(rdt_data); break; + case RDT_STOP_CONFLICT_INFO_RETENTION: + stop_conflict_info_retention(rdt_data); + break; } } @@ -4468,6 +4533,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, if (!status_received) return; + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) + return; + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4549,6 +4621,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * have a WAL position greater than the rdt_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + TimestampTz now; + + now = rdt_data->last_recv_time + ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Record the time spent waiting for table sync, it is needed for the + * timeout check in should_stop_conflict_info_retention(). + */ + rdt_data->table_sync_wait_time = + TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now); + + return; + } + + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) return; /* @@ -4583,9 +4676,9 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * transactions up to that position on the publisher have been applied and * flushed locally. So, we can advance the non-removable transaction ID. */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u", LSN_FORMAT_ARGS(rdt_data->remote_lsn), @@ -4594,12 +4687,67 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase. + */ +static void +stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + /* + * Do not update the catalog during an active transaction. The transaction + * may be started during change application, leading to a possible + * rollback of catalog updates if the application fails subsequently. + */ + if (IsTransactionState()) + return; + + StartTransactionCommand(); + /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Set pg_subscription.subretentionactive to false */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, false); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + SpinLockAcquire(&MyLogicalRepWorker->mutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->mutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("The retention duration for information used in conflict detection has exceeded the maximum limit of %u ms.", + MySubscription->maxconflretention), + errhint("You might need to increase \"%s\".", + "max_conflict_retention_duration")); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + + reset_retention_data_fields(rdt_data); +} + +/* + * Reset all data fields of RetainDeadTuplesData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_retention_data_fields(RetainDeadTuplesData *rdt_data) +{ rdt_data->phase = RDT_GET_CANDIDATE_XID; rdt_data->remote_lsn = InvalidXLogRecPtr; rdt_data->remote_oldestxid = InvalidFullTransactionId; @@ -4607,9 +4755,56 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) rdt_data->reply_time = 0; rdt_data->remote_wait_for = InvalidFullTransactionId; rdt_data->candidate_xid = InvalidTransactionId; + rdt_data->table_sync_wait_time = 0; +} + +/* + * Check whether conflict information retention should be stopped because the + * wait time has exceeded the maximum limit (max_conflict_retention_duration). + * + * If retention should be stopped, proceed to the + * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return + * false. + * + * Currently, the retention will not resume automatically unless user manually + * disables retain_dead_tuples and re-enables it after confirming that the + * replication slot has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!MySubscription->maxconflretention) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return if the wait time has not exceeded the maximum limit + * (max_conflict_retention_duration). The time spent waiting for table + * synchronization is not counted, as it's an infrequent operation. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + MySubscription->maxconflretention + + rdt_data->table_sync_wait_time)) + return false; + + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; /* process the next phase */ process_rdt_phase_transition(rdt_data, false); + + return true; } /* @@ -4621,8 +4816,8 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * 3 minutes which should be sufficient to avoid using CPU or network * resources without much benefit. * - * The interval is reset to a minimum value of 100ms once there is some - * activity on the node. + * The interval is reset to the lesser of 100ms and + * max_conflict_retention_duration once there is some activities on the node. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4642,6 +4837,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) */ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, max_interval); + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxconflretention); } else { @@ -5455,7 +5654,7 @@ InitializeLogRepWorker(void) * logicalrep_worker_launch. */ if (am_leader_apply_worker() && - MySubscription->retaindeadtuples && + MySubscription->retaindeadtuples && MySubscription->retentionactive && !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) { ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fc7a6639163..8d8bcf61075 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5048,6 +5048,7 @@ getSubscriptions(Archive *fout) int i_subenabled; int i_subfailover; int i_subretaindeadtuples; + int i_submaxconflretention; int i, ntups; @@ -5127,10 +5128,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.subretaindeadtuples\n"); + " s.subretaindeadtuples,\n"); else appendPQExpBufferStr(query, - " false AS subretaindeadtuples\n"); + " false AS subretaindeadtuples,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.submaxconflretention\n"); + else + appendPQExpBuffer(query, + " 0 AS submaxconflretention\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5165,6 +5173,7 @@ getSubscriptions(Archive *fout) i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); + i_submaxconflretention = PQfnumber(res, "submaxconflretention"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5200,6 +5209,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); subinfo[i].subretaindeadtuples = (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); + subinfo[i].submaxconflretention = + atoi(PQgetvalue(res, i, i_submaxconflretention)); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5461,6 +5472,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subretaindeadtuples) appendPQExpBufferStr(query, ", retain_dead_tuples = true"); + if (subinfo->submaxconflretention) + appendPQExpBuffer(query, ", max_conflict_retention_duration = %d", subinfo->submaxconflretention); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index dde85ed156c..e6b94422af7 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -717,6 +717,7 @@ typedef struct _SubscriptionInfo bool subrunasowner; bool subfailover; bool subretaindeadtuples; + int submaxconflretention; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7a06af48842..b6d57d02778 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false}; + false, false, false, false}; if (pset.sversion < 100000) { @@ -6815,10 +6815,20 @@ describeSubscriptions(const char *pattern, bool verbose) ", subfailover AS \"%s\"\n", gettext_noop("Failover")); if (pset.sversion >= 190000) + { appendPQExpBuffer(&buf, ", subretaindeadtuples AS \"%s\"\n", gettext_noop("Retain dead tuples")); + appendPQExpBuffer(&buf, + ", submaxconflretention AS \"%s\"\n", + gettext_noop("Max conflict retention duration")); + + appendPQExpBuffer(&buf, + ", subretentionactive AS \"%s\"\n", + gettext_noop("Dead tuple retention active")); + } + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8b10f2313f3..2d7016fe717 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2321,7 +2321,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", + COMPLETE_WITH("binary", "disable_on_error", "failover", + "max_conflict_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3780,7 +3781,8 @@ match_previous_words(int pattern_id, /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", + "disable_on_error", "enabled", "failover", + "max_conflict_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 118d6da1ace..3810f3883b7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5696,9 +5696,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text,bool}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type,dead_tuple_retention_active}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 231ef84ec9a..fcc1ad173ca 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -81,6 +81,17 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subretaindeadtuples; /* True if dead tuples useful for * conflict detection are retained */ + int32 submaxconflretention; /* The maximum duration (in + * milliseconds) for which information + * useful for conflict detection can + * be retained */ + + bool subretentionactive; /* True if retain_dead_tuples is enabled + * and the retention duration has not + * exceeded + * max_conflict_retention_duration, when + * defined */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -136,6 +147,14 @@ typedef struct Subscription * to be synchronized to the standbys. */ bool retaindeadtuples; /* True if dead tuples useful for conflict * detection are retained */ + int32 maxconflretention; /* The maximum duration (in milliseconds) + * for which information useful for + * conflict detection can be retained */ + bool retentionactive; /* True if retain_dead_tuples is enabled + * and the retention duration has not + * exceeded + * max_conflict_retention_duration, when + * defined */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index f458447a0e5..02f97a547dd 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -92,4 +92,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); + #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..9c0c2b8050c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -64,7 +64,12 @@ typedef struct LogicalRepWorker Oid relid; char relstate; XLogRecPtr relstate_lsn; - slock_t relmutex; + + /* + * Spinlock used to protect table synchronization information and the + * oldest_nonremovable_xid. + */ + slock_t mutex; /* * Used to create the changes and subxact files for the streaming @@ -94,6 +99,9 @@ typedef struct LogicalRepWorker * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. + * + * This ID would be set to InvalidTransactionId if the apply worker has + * stopped retaining information useful for conflict detection. */ TransactionId oldest_nonremovable_xid; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 35e8aad7701..183fc193ad3 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2174,9 +2174,10 @@ pg_stat_subscription| SELECT su.oid AS subid, st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.dead_tuple_retention_active FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type, dead_tuple_retention_active) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a98c97f7616..c6b0784b253 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,36 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - max_conflict_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = foo); +ERROR: max_conflict_retention_duration requires an integer value +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = 1000); +NOTICE: max_conflict_retention_duration has no effect when retain_dead_tuples is disabled +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_conflict_retention_duration = 0); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index f0f714fe747..9b2c489adaf 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -298,6 +298,22 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - max_conflict_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = foo); + +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = 1000); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_conflict_retention_duration = 0); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 36aeb14c563..c1b8ede81cb 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -214,6 +214,10 @@ ok( $node_B->poll_query_until( ), "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); +my $result = $node_B->safe_psql('postgres', + "SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_BA';"); +is($result, qq(t), 'worker on node B retains conflict information'); + ################################################## # Check that the retain_dead_tuples option can be enabled only for disabled # subscriptions. Validate the NOTICE message during the subscription DDL, and @@ -254,6 +258,10 @@ ok( $node_A->poll_query_until( ), "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); +$result = $node_A->safe_psql('postgres', + "SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_AB';"); +is($result, qq(t), 'worker on node A retains conflict information'); + ################################################## # Check the WARNING when changing the origin to ANY, if retain_dead_tuples is # enabled. This warns of the possibility of receiving changes from origins @@ -281,7 +289,7 @@ $node_A->psql('postgres', $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); $node_A->wait_for_catchup($subname_BA); -my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); is($result, qq(1|1 2|2), 'check replicated insert on node B'); -- 2.50.1.windows.1