From 447c4acbc0157af1dfde84970d19f8de5ac7d67a Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Tue, 30 Jul 2024 20:43:55 +0800 Subject: [PATCH v8 2/3] Add a detect_conflict option to subscriptions This patch adds a new parameter detect_conflict for CREATE and ALTER subscription commands. This new parameter will decide if subscription will go for confict detection and provide additional logging information. To avoid the potential overhead introduced by conflict detection, detect_conflict will be off for a subscription by default. --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/logical-replication.sgml | 115 +++---------- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 89 ++++++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 54 +++++- src/backend/replication/logical/worker.c | 58 ++++--- src/bin/pg_dump/pg_dump.c | 17 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.c | 14 +- src/include/catalog/pg_subscription.h | 4 + src/test/regress/expected/subscription.out | 188 ++++++++++++--------- src/test/regress/sql/subscription.sql | 19 +++ src/test/subscription/t/001_rep_changes.pl | 7 + src/test/subscription/t/013_partition.pl | 23 +++ src/test/subscription/t/029_on_error.pl | 2 +- src/test/subscription/t/030_origin.pl | 7 +- 19 files changed, 413 insertions(+), 209 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index b654fae1b2..b042a5a94a 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8035,6 +8035,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subdetectconflict bool + + + If true, the subscription is enabled for conflict detection. + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 8f69844b6a..5489b400ee 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1580,87 +1580,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id; stop. This is referred to as a conflict. When replicating UPDATE or DELETE operations, missing data will not produce a conflict and such operations - will simply be skipped. - - - - Additional logging is triggered in the following conflict - scenarios: - - - insert_exists - - - Inserting a row that violates a NOT DEFERRABLE - unique constraint. Note that to obtain the origin and commit - timestamp details of the conflicting key in the log, ensure that - track_commit_timestamp - is enabled. In this scenario, an error will be raised until the - conflict is resolved manually. - - - - - update_exists - - - The updated value of a row violates a NOT DEFERRABLE - unique constraint. Note that to obtain the origin and commit - timestamp details of the conflicting key in the log, ensure that - track_commit_timestamp - is enabled. In this scenario, an error will be raised until the - conflict is resolved manually. Note that when updating a - partitioned table, if the updated row value satisfies another - partition constraint resulting in the row being inserted into a - new partition, the insert_exists conflict may - arise if the new row violates a NOT DEFERRABLE - unique constraint. - - - - - update_differ - - - Updating a row that was previously modified by another origin. - Note that this conflict can only be detected when - track_commit_timestamp - is enabled. Currenly, the update is always applied regardless of - the origin of the local row. - - - - - update_missing - - - The tuple to be updated was not found. The update will simply be - skipped in this scenario. - - - - - delete_missing - - - The tuple to be deleted was not found. The delete will simply be - skipped in this scenario. - - - - - delete_differ - - - Deleting a row that was previously modified by another origin. Note that this - conflict can only be detected when - track_commit_timestamp - is enabled. Currenly, the delete is always applied regardless of - the origin of the local row. - - - - + will simply be skipped. Please refer to + detect_conflict + for all the conflicts that will be logged when enabling detect_conflict. @@ -1689,8 +1611,8 @@ test_sub=# SELECT * FROM t1 ORDER BY id; an error, the replication won't proceed, and the logical replication worker will emit the following kind of message to the subscriber's server log: -ERROR: conflict insert_exists detected on relation "public.test" -DETAIL: Key (c)=(1) already exists in unique index "t_pkey", which was modified locally in transaction 740 at 2024-06-26 10:47:04.727375+08. +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (c)=(1) already exists. CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378 The LSN of the transaction that contains the change violating the constraint and @@ -1716,15 +1638,6 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER Please note that skipping the whole transaction includes skipping changes that might not violate any constraint. This can easily make the subscriber inconsistent. - The additional details regarding conflicting rows, such as their origin and - commit timestamp can be seen in the DETAIL line of the - log. But note that this information is only available when - track_commit_timestamp - is enabled. Users can use these information to make decisions on whether to - retain the local change or adopt the remote alteration. For instance, the - origin in above log indicates that the existing row was modified by a local - change, users can manually perform a remote-change-win resolution by - deleting the local row. @@ -1738,6 +1651,24 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER linkend="sql-altersubscription">ALTER SUBSCRIPTION ... SKIP. + + + Enabling both detect_conflict + and track_commit_timestamp + on the subscriber can provide additional details regarding conflicting + rows, such as their origin and commit timestamp, in case of a unique + constraint violation conflict: + +ERROR: conflict insert_exists detected on relation "public.test" +DETAIL: Key (c)=(1) already exists in unique index "t_pkey", which was modified locally in transaction 740 at 2024-06-26 10:47:04.727375+08. +CONTEXT: processing remote data for replication origin "pg_16389" during message type "INSERT" for replication target relation "public.test" in transaction 740, finished at 0/14F7EC0 + + Users can use these information to make decisions on whether to retain + the local change or adopt the remote alteration. For instance, the + origin in above log indicates that the existing row was modified by a + local change, users can manually perform a remote-change-win resolution + by deleting the local row. + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007..dfbe25b59e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION name RENAME TO < password_required, run_as_owner, origin, - failover, and - two_phase. + failover, + two_phase, and + detect_conflict. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 740b7d9421..d07201abec 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -428,6 +428,95 @@ CREATE SUBSCRIPTION subscription_name + + + detect_conflict (boolean) + + + Specifies whether the subscription is enabled for conflict detection. + The default is false. + + + When conflict detection is enabled, additional logging is triggered + in the following scenarios: + + + insert_exists + + + Inserting a row that violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. In this scenario, an error will be raised until the + conflict is resolved manually. + + + + + update_exists + + + The updated value of a row violates a NOT DEFERRABLE + unique constraint. Note that to obtain the origin and commit + timestamp details of the conflicting key in the log, ensure that + track_commit_timestamp + is enabled. In this scenario, an error will be raised until the + conflict is resolved manually. Note that when updating a + partitioned table, if the updated row value satisfies another + partition constraint resulting in the row being inserted into a + new partition, the insert_exists conflict may + arise if the new row violates a NOT DEFERRABLE + unique constraint. + + + + + update_differ + + + Updating a row that was previously modified by another origin. + Note that this conflict can only be detected when + track_commit_timestamp + is enabled. Currenly, the update is always applied regardless of + the origin of the local row. + + + + + update_missing + + + The tuple to be updated was not found. The update will simply be + skipped in this scenario. + + + + + delete_missing + + + The tuple to be deleted was not found. The delete will simply be + skipped in this scenario. + + + + + delete_differ + + + Deleting a row that was previously modified by another origin. Note that this + conflict can only be detected when + track_commit_timestamp + is enabled. Currenly, the delete is always applied regardless of + the origin of the local row. + + + + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..5a423f4fb0 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->detectconflict = subform->subdetectconflict; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 19cabc9a47..d084bfc48a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1356,7 +1356,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subdetectconflict, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d124bfe55c..a949d246df 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_DETECT_CONFLICT 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool detectconflict; char *origin; XLogRecPtr lsn; } SubOpts; @@ -112,6 +115,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); +static void check_conflict_detection(void); /* @@ -162,6 +166,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT)) + opts->detectconflict = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -307,6 +313,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT) && + strcmp(defel->defname, "detect_conflict") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_DETECT_CONFLICT; + opts->detectconflict = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -594,7 +609,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -639,6 +655,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, errmsg("password_required=false is superuser-only"), errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser."))); + if (opts.detectconflict) + check_conflict_detection(); + /* * If built with appropriate switch, whine when regression-testing * conventions for subscription names are violated. @@ -701,6 +720,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1196,7 +1217,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1356,6 +1377,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_DETECT_CONFLICT)) + { + values[Anum_pg_subscription_subdetectconflict - 1] = + BoolGetDatum(opts.detectconflict); + replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + + if (opts.detectconflict) + check_conflict_detection(); + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -2536,3 +2567,18 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Report a warning about incomplete conflict detection if + * track_commit_timestamp is disabled. + */ +static void +check_conflict_detection(void) +{ + if (!track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict detection could be incomplete due to disabled track_commit_timestamp"), + errdetail("Conflicts update_differ and delete_differ cannot be detected, " + "and the origin and commit timestamp for the local row will not be logged.")); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0541e8a165..6575b040a7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2459,8 +2459,10 @@ apply_handle_insert_internal(ApplyExecutionData *edata, EState *estate = edata->estate; /* We must open indexes here. */ - ExecOpenIndices(relinfo, true); - InitConflictIndexes(relinfo); + ExecOpenIndices(relinfo, MySubscription->detectconflict); + + if (MySubscription->detectconflict) + InitConflictIndexes(relinfo); /* Do the insert. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); @@ -2648,7 +2650,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, MemoryContext oldctx; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(relinfo, true); + ExecOpenIndices(relinfo, MySubscription->detectconflict); found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel, @@ -2668,10 +2670,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, TimestampTz localts; /* - * Check whether the local tuple was modified by a different origin. - * If detected, report the conflict. + * If conflict detection is enabled, check whether the local tuple was + * modified by a different origin. If detected, report the conflict. */ - if (GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) ReportApplyConflict(LOG, CT_UPDATE_DIFFER, localrel, InvalidOid, localxmin, localorigin, localts, NULL); @@ -2683,7 +2686,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, EvalPlanQualSetSlot(&epqstate, remoteslot); - InitConflictIndexes(relinfo); + if (MySubscription->detectconflict) + InitConflictIndexes(relinfo); /* Do the actual update. */ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); @@ -2696,8 +2700,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, * The tuple to be updated could not be found. Do nothing except for * emitting a log message. */ - ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, - InvalidTransactionId, InvalidRepOriginId, 0, NULL); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_UPDATE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -2825,14 +2830,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata, TimestampTz localts; /* - * Check whether the local tuple was modified by a different origin. - * If detected, report the conflict. + * If conflict detection is enabled, check whether the local tuple was + * modified by a different origin. If detected, report the conflict. * * For cross-partition update, we skip detecting the delete_differ * conflict since it should have been done in * apply_handle_tuple_routing(). */ - if ((!edata->mtstate || edata->mtstate->operation != CMD_UPDATE) && + if (MySubscription->detectconflict && + (!edata->mtstate || edata->mtstate->operation != CMD_UPDATE) && GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) ReportApplyConflict(LOG, CT_DELETE_DIFFER, localrel, InvalidOid, @@ -2850,8 +2856,9 @@ apply_handle_delete_internal(ApplyExecutionData *edata, * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. */ - ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, - InvalidTransactionId, InvalidRepOriginId, 0, NULL); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_DELETE_MISSING, localrel, InvalidOid, + InvalidTransactionId, InvalidRepOriginId, 0, NULL); } /* Cleanup. */ @@ -3033,19 +3040,22 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * The tuple to be updated could not be found. Do nothing * except for emitting a log message. */ - ReportApplyConflict(LOG, CT_UPDATE_MISSING, - partrel, InvalidOid, - InvalidTransactionId, - InvalidRepOriginId, 0, NULL); + if (MySubscription->detectconflict) + ReportApplyConflict(LOG, CT_UPDATE_MISSING, + partrel, InvalidOid, + InvalidTransactionId, + InvalidRepOriginId, 0, NULL); return; } /* - * Check whether the local tuple was modified by a different - * origin. If detected, report the conflict. + * If conflict detection is enabled, check whether the local + * tuple was modified by a different origin. If detected, + * report the conflict. */ - if (GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && + if (MySubscription->detectconflict && + GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) ReportApplyConflict(LOG, CT_UPDATE_DIFFER, partrel, InvalidOid, localxmin, localorigin, @@ -3078,8 +3088,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, EPQState epqstate; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); - ExecOpenIndices(partrelinfo, true); - InitConflictIndexes(partrelinfo); + ExecOpenIndices(partrelinfo, MySubscription->detectconflict); + + if (MySubscription->detectconflict) + InitConflictIndexes(partrelinfo); EvalPlanQualSetSlot(&epqstate, remoteslot_part); TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index b8b1888bd3..829f9b3e88 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4760,6 +4760,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subdetectconflict; int i, ntups; @@ -4832,11 +4833,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + if (fout->remoteVersion >= 170000) + appendPQExpBufferStr(query, + " s.subdetectconflict\n"); + else + appendPQExpBuffer(query, + " false AS subdetectconflict\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -4875,6 +4882,7 @@ getSubscriptions(Archive *fout) i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); i_subenabled = PQfnumber(res, "subenabled"); i_subfailover = PQfnumber(res, "subfailover"); + i_subdetectconflict = PQfnumber(res, "subdetectconflict"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4921,6 +4929,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subenabled)); subinfo[i].subfailover = pg_strdup(PQgetvalue(res, i, i_subfailover)); + subinfo[i].subdetectconflict = + pg_strdup(PQgetvalue(res, i, i_subdetectconflict)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5161,6 +5171,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subfailover, "t") == 0) appendPQExpBufferStr(query, ", failover = true"); + if (strcmp(subinfo->subdetectconflict, "t") == 0) + appendPQExpBufferStr(query, ", detect_conflict = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 4b2e5870a9..bbd7cbeff6 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *suboriginremotelsn; char *subfailover; + char *subdetectconflict; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7c9a1f234c..fef1ad0d70 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6539,7 +6539,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6607,6 +6607,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 170000) + appendPQExpBuffer(&buf, + ", subdetectconflict AS \"%s\"\n", + gettext_noop("Detect conflict")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 891face1b6..086e135f65 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1946,9 +1946,10 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + COMPLETE_WITH("binary", "detect_conflict", "disable_on_error", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3363,9 +3364,10 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "detect_conflict", "disable_on_error", "enabled", + "failover", "origin", "password_required", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..17daf11dc7 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subdetectconflict; /* True if replication should perform + * conflict detection */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -151,6 +154,7 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool detectconflict; /* True if conflict detection is enabled */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 17d48b1685..118f207df5 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,54 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); +ERROR: detect_conflict requires a Boolean value +-- now it works, but will report a warning due to disabled track_commit_timestamp +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: conflict detection could be incomplete due to disabled track_commit_timestamp +DETAIL: Conflicts update_differ and delete_differ cannot be detected, and the origin and commit timestamp for the local row will not be logged. +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = true); +WARNING: conflict detection could be incomplete due to disabled track_commit_timestamp +DETAIL: Conflicts update_differ and delete_differ cannot be detected, and the origin and commit timestamp for the local row will not be logged. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e7037..b3f2ab1684 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,25 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - detect_conflict must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo); + +-- now it works, but will report a warning due to disabled track_commit_timestamp +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 79cbed2e5b..78c0307165 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -331,6 +331,13 @@ is( $result, qq(1|bar 2|baz), 'update works with REPLICA IDENTITY FULL and a primary key'); +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (detect_conflict = true)" +); + $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); # Note that the current location of the log file is not grabbed immediately diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 896985d85b..a3effed937 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -343,6 +343,13 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); is($result, qq(), 'truncate of tab1 replicated'); +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = true)" +); + $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); @@ -377,6 +384,10 @@ ok( $logfile =~ qr/conflict delete_missing detected on relation "public.tab1_def".*\n.*DETAIL:.* Did not find the row to be deleted./, 'delete target row is missing in tab1_def'); +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 SET (detect_conflict = false)" +); + # Tests for replication using root table identity and schema # publisher @@ -762,6 +773,13 @@ pub_tab2|3|yyy pub_tab2|5|zzz xxx_c|6|aaa), 'inserts into tab2 replicated'); +# To check that subscriber handles cases where update/delete target tuple +# is missing, detect_conflict is temporarily enabled to log conflicts +# related to missing tuples. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = true)" +); + $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); # Note that the current location of the log file is not grabbed immediately @@ -800,6 +818,11 @@ ok( $logfile =~ qr/conflict update_differ detected on relation "public.tab2_1".*\n.*DETAIL:.* Updating a row that was modified locally in transaction [0-9]+ at .*/, 'updating a tuple that was modified by a different origin'); +# The remaining tests no longer test conflict detection. +$node_subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = false)" +); + $node_subscriber1->append_conf('postgresql.conf', 'track_commit_timestamp = off'); $node_subscriber1->restart; diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index ac7c8bbe7c..b71d0c3400 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -111,7 +111,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE tbl"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" ); # Initial synchronization failure causes the subscription to be disabled. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 5a22413464..6bc4474d15 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -149,7 +149,9 @@ is($result, qq(), # delete a row that was previously modified by a different source. ############################################################################### -$node_B->safe_psql('postgres', "DELETE FROM tab;"); +$node_B->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_BC SET (detect_conflict = true); + DELETE FROM tab;"); $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); @@ -183,6 +185,9 @@ $node_B->wait_for_log( ); # The remaining tests no longer test conflict detection. +$node_B->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_BC SET (detect_conflict = false);"); + $node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off'); $node_B->restart; -- 2.30.0.windows.2