From ac8da6fa047037ba1c2f6514e3481ce2afce0fea Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 31 May 2024 14:20:45 +0530 Subject: [PATCH v1] Detect update and delete conflict in logical replication This patch adds a new parameter detect_conflict for CREATE and ALTER subscription commands. This new parameter will decide if subscription will go for confict detection. By default, conflict detection will be off for a subscription. When conflict detection is enabled, additional logging is triggered in the following conflict scenarios: * updating a row that was previously modified by another origin. * The tuple to be updated is not found. * The tuple to be deleted is not found. While there exist other conflict types in logical replication, such as an incoming insert conflicting with an existing row due to a primary key or unique index, these cases already result in constraint violation errors. Therefore, additional conflict detection for these cases is currently omitted to minimize potential overhead. However, the pre-detection for conflict in these error cases is still essential to support automatic conflict resolution in the future. --- doc/src/sgml/catalogs.sgml | 9 ++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 43 +++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 31 +++- src/backend/replication/logical/worker.c | 145 ++++++++++++++--- src/bin/pg_dump/pg_dump.c | 17 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.c | 14 +- src/include/catalog/pg_subscription.h | 4 + src/test/regress/expected/subscription.out | 176 ++++++++++++--------- src/test/regress/sql/subscription.sql | 15 ++ src/test/subscription/t/001_rep_changes.pl | 15 +- src/test/subscription/t/013_partition.pl | 48 +++--- src/test/subscription/t/030_origin.pl | 26 +++ src/tools/pgindent/typedefs.list | 1 + 18 files changed, 419 insertions(+), 141 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 15f6255d86..495a6ea479 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8038,6 +8038,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subdetectconflict bool + + + If true, the subscription is enabled for conflict detection. + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 476f195622..5f6b83e415 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -228,8 +228,9 @@ ALTER SUBSCRIPTION name RENAME TO < disable_on_error, password_required, run_as_owner, - origin, and - failover. + origin, + failover, and + detect_conflict. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 740b7d9421..45f71d6386 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -428,6 +428,49 @@ CREATE SUBSCRIPTION subscription_name + + + detect_conflict (boolean) + + + Specifies whether the subscription is enabled for conflict detection. + The default is false. + + + When conflict detection is enabled, additional logging is triggered + in the following scenarios: + + + update_differ + + + Updating a row that was previously modified by another origin. Note that this + conflict only be detected when + track_commit_timestamp + is enabled. + + + + + update_missing + + + The tuple to be updated is not found. + + + + + delete_missing + + + The tuple to be deleted is not found. + + + + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..5a423f4fb0 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->detectconflict = subform->subdetectconflict; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 53047cab5f..49ac738f26 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1359,7 +1359,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subdetectconflict, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e407428dbc..e670d72708 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -70,8 +70,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_DETECT_CONFLICT 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool detectconflict; char *origin; XLogRecPtr lsn; } SubOpts; @@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT)) + opts->detectconflict = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT) && + strcmp(defel->defname, "detect_conflict") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_DETECT_CONFLICT; + opts->detectconflict = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -603,7 +616,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1146,7 +1162,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1256,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_DETECT_CONFLICT)) + { + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); + replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e8..80d6d02ecb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -147,6 +147,7 @@ #include #include +#include "access/commit_ts.h" #include "access/table.h" #include "access/tableam.h" #include "access/twophase.h" @@ -274,6 +275,31 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * Conflict types that could be encountered when applying remote changes. + * + * For now, this list includes conflict types that will prompt additional logging + * only if conflict detection is turned on. Other conflicts that already + * lead to constraint violation errors are excluded from this enumeration. + */ +typedef enum +{ + /* The row to be updated was modified by a different origin */ + CT_UPDATE_DIFFER, + + /* The row to be updated is missing */ + CT_UPDATE_MISSING, + + /* The row to be deleted is missing */ + CT_DELETE_MISSING, +} ConflictType; + +const char *const ConflictTypeNames[] = { + [CT_UPDATE_DIFFER] = "update_differ", + [CT_UPDATE_MISSING] = "update_missing", + [CT_DELETE_MISSING] = "delete_missing" +}; + /* errcontext tracker */ ApplyErrorCallbackArg apply_error_callback_arg = { @@ -416,6 +442,14 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static bool get_tuple_commit_ts(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, + TimestampTz *localts); +static void report_apply_conflict(ConflictType type, Relation localrel, + TransactionId localxmin, + RepOriginId localorigin, + TimestampTz localts); + /* * Form the origin name for the subscription. * @@ -2664,6 +2698,20 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { + RepOriginId localorigin; + TransactionId localxmin; + TimestampTz localts; + + /* + * If conflict detection is enabled, check whether the local tuple was + * modified by a different origin. If detected, report the conflict. + */ + if (MySubscription->detectconflict && + get_tuple_commit_ts(localslot, &localxmin, &localorigin, &localts) && + localorigin != replorigin_session_origin) + report_apply_conflict(CT_UPDATE_DIFFER, localrel, localxmin, + localorigin, localts); + /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_modify_data(remoteslot, localslot, relmapentry, newtup); @@ -2681,13 +2729,10 @@ apply_handle_update_internal(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + if (MySubscription->detectconflict) + report_apply_conflict(CT_UPDATE_MISSING, localrel, + InvalidTransactionId, InvalidRepOriginId, 0); } /* Cleanup. */ @@ -2821,13 +2866,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be deleted " - "in replication target relation \"%s\"", - RelationGetRelationName(localrel)); + if (MySubscription->detectconflict) + report_apply_conflict(CT_DELETE_MISSING, localrel, + InvalidTransactionId, InvalidRepOriginId, 0); } /* Cleanup. */ @@ -3005,13 +3047,13 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* * The tuple to be updated could not be found. Do nothing * except for emitting a log message. - * - * XXX should this be promoted to ereport(LOG) perhaps? */ - elog(DEBUG1, - "logical replication did not find row to be updated " - "in replication target relation's partition \"%s\"", - RelationGetRelationName(partrel)); + if (MySubscription->detectconflict) + report_apply_conflict(CT_UPDATE_MISSING, + partrel, + InvalidTransactionId, + InvalidRepOriginId, 0); + return; } @@ -5088,3 +5130,70 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_APPLY; } } + +/* + * Get the xmin and commit timestamp data (origin and timestamp) associated + * with the provided local tuple. + * + * Returns true if the commit timestamp data was found, false otherwise. + */ +static bool +get_tuple_commit_ts(TupleTableSlot *localslot, TransactionId *xmin, + RepOriginId *localorigin, TimestampTz *localts) +{ + Datum xminDatum; + bool isnull; + + xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber, + &isnull); + *xmin = DatumGetTransactionId(xminDatum); + Assert(!isnull); + + /* + * The commit timestamp data is not available if track_commit_timestamp is + * disabled. + */ + if (!track_commit_timestamp) + { + *localorigin = InvalidRepOriginId; + *localts = 0; + return false; + } + + return TransactionIdGetCommitTsData(*xmin, localts, localorigin); +} + +/* + * Report a conflict when applying remote changes. + */ +static void +report_apply_conflict(ConflictType type, Relation localrel, + TransactionId localxmin, RepOriginId localorigin, + TimestampTz localts) +{ + switch (type) + { + case CT_UPDATE_DIFFER: + ereport(LOG, + errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("conflict %s detected on relation \"%s\"", + ConflictTypeNames[type], RelationGetRelationName(localrel)), + errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s.", + localorigin, localxmin, timestamptz_to_str(localts))); + break; + case CT_UPDATE_MISSING: + ereport(LOG, + errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("conflict %s detected on relation \"%s\"", + ConflictTypeNames[type], RelationGetRelationName(localrel)), + errdetail("Did not find the row to be updated.")); + break; + case CT_DELETE_MISSING: + ereport(LOG, + errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("conflict %s detected on relation \"%s\"", + ConflictTypeNames[type], RelationGetRelationName(localrel)), + errdetail("Did not find the row to be deleted.")); + break; + } +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e324070828..c6b67c692d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4739,6 +4739,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subdetectconflict; int i, ntups; @@ -4811,11 +4812,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + if (fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + " s.subdetectconflict\n"); + else + appendPQExpBuffer(query, + " false AS subdetectconflict\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -4854,6 +4861,7 @@ getSubscriptions(Archive *fout) i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); i_subenabled = PQfnumber(res, "subenabled"); i_subfailover = PQfnumber(res, "subfailover"); + i_subdetectconflict = PQfnumber(res, "subdetectconflict"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4900,6 +4908,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subenabled)); subinfo[i].subfailover = pg_strdup(PQgetvalue(res, i, i_subfailover)); + subinfo[i].subdetectconflict = + pg_strdup(PQgetvalue(res, i, i_subdetectconflict)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5140,6 +5150,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subfailover, "t") == 0) appendPQExpBufferStr(query, ", failover = true"); + if (strcmp(subinfo->subdetectconflict, "t") == 0) + appendPQExpBufferStr(query, ", detect_conflict = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 865823868f..02aa4a6f32 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *suboriginremotelsn; char *subfailover; + char *subdetectconflict; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index f67bf0b892..0472fe2e87 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6529,7 +6529,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6597,6 +6597,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 170000) + appendPQExpBuffer(&buf, + ", subdetectconflict AS \"%s\"\n", + gettext_noop("Detect conflict")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d453e224d9..219fac7e71 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1946,9 +1946,10 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + COMPLETE_WITH("binary", "detect_conflict", "disable_on_error", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3363,9 +3364,10 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "detect_conflict", "disable_on_error", "enabled", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..aad4907d43 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subdetectconflict; /* True if replication should perform + * conflict detection */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -151,6 +154,7 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool detectconflict; /* True if conflict detection is enabled */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 0f2a25cdc1..a8b0086dd9 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +383,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +412,42 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); +ERROR: detect_conflict requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 3e5ba4cb8c..a77b196704 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -290,6 +290,21 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 471e981962..d74f6bdabe 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -331,11 +331,12 @@ is( $result, qq(1|bar 2|baz), 'update works with REPLICA IDENTITY FULL and a primary key'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); -$node_subscriber->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (detect_conflict = true)" +); $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); @@ -352,10 +353,10 @@ $node_publisher->wait_for_catchup('tap_sub'); my $logfile = slurp_file($node_subscriber->logfile, $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + qr/conflict update_missing detected on relation "tab_full_pk".*\n.*DETAIL:.* Did not find the row to be updated./m, 'update target row is missing'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + qr/conflict delete_missing detected on relation "tab_full_pk".*\n.*DETAIL:.* Did not find the row to be deleted./m, 'delete target row is missing'); $node_subscriber->append_conf('postgresql.conf', diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 29580525a9..02afbc2ed4 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -343,12 +343,12 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = true)" +); $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); @@ -372,21 +372,21 @@ $node_publisher->wait_for_catchup('sub2'); my $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + qr/conflict update_missing detected on relation "tab1_2_2".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + qr/conflict delete_missing detected on relation "tab1_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + qr/conflict delete_missing detected on relation "tab1_2_2".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_2_2'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + qr/conflict delete_missing detected on relation "tab1_def".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_def'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = false)" +); # Tests for replication using root table identity and schema @@ -773,12 +773,12 @@ pub_tab2|3|yyy pub_tab2|5|zzz xxx_c|6|aaa), 'inserts into tab2 replicated'); -# Check that subscriber handles cases where update/delete target tuple -# is missing. We have to look for the DEBUG1 log messages about that, -# so temporarily bump up the log verbosity. -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = debug1"); -$node_subscriber1->reload; +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = true)" +); $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); @@ -796,15 +796,15 @@ $node_publisher->wait_for_catchup('sub2'); $logfile = slurp_file($node_subscriber1->logfile(), $log_location); ok( $logfile =~ - qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + qr/conflict update_missing detected on relation "tab2_1".*\n.*DETAIL:.* Did not find the row to be updated./, 'update target row is missing in tab2_1'); ok( $logfile =~ - qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + qr/conflict delete_missing detected on relation "tab2_1".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab2_1'); -$node_subscriber1->append_conf('postgresql.conf', - "log_min_messages = warning"); -$node_subscriber1->reload; +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = false)" +); # Test that replication continues to work correctly after altering the # partition of a partitioned target table. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 056561f008..03dabfeb72 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -26,7 +26,12 @@ my $stderr; # node_A my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); + +# Enable the track_commit_timestamp to detect the conflict when attempting to +# update a row that was previously modified by a different origin. +$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = on'); $node_A->start; + # node_B my $node_B = PostgreSQL::Test::Cluster->new('node_B'); $node_B->init(allows_streaming => 'logical'); @@ -89,11 +94,32 @@ is( $result, qq(11 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' ); +############################################################################### +# Check that the conflict can be detected when attempting to update a row that +# was previously modified by a different source. +############################################################################### + +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (detect_conflict = true);"); + +$node_B->safe_psql('postgres', "UPDATE tab SET a = 10 WHERE a = 11;"); + +$node_A->wait_for_log( + qr/Updating a row that was modified by a different origin [0-9]+ in transaction [0-9]+ at .*/ +); + $node_A->safe_psql('postgres', "DELETE FROM tab;"); $node_A->wait_for_catchup($subname_BA); $node_B->wait_for_catchup($subname_AB); +# The remaining tests no longer test conflict detection. +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (detect_conflict = false);"); + +$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = off'); +$node_A->restart; + ############################################################################### # Check that remote data of node_B (that originated from node_C) is not # published to node_A. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d427a1c16a..3b12be42ad 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -465,6 +465,7 @@ ConditionVariableMinimallyPadded ConditionalStack ConfigData ConfigVariable +ConflictType ConnCacheEntry ConnCacheKey ConnParams -- 2.30.0.windows.2