From d5cabc6ea1b2c680efcf6012ec89b6da5525c8ea Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 31 Jul 2024 03:08:50 -0400 Subject: [PATCH v8 2/4] Add CONFLICT RESOLVERS into the syntax for CREATE and ALTER SUBSCRIPTION This patch provides support for configuring subscriptions with conflict resolvers Syntax for CREATE SUBSCRIPTION: CREATE SUBSCRIPTION CONNECTION PUBLICATION CONFLICT RESOLVER (conflict_type1 = resolver1, conflict_type2 = resolver2, conflict_type3 = resolver3,...); Syntax for ALTER SUBSCRIPTION: ALTER SUBSCRIPTION CONFLICT RESOLVER (conflict_type1 = resolver1, conflict_type2 = resolver2, conflict_type3 = resolver3,...); --- src/backend/commands/subscriptioncmds.c | 100 ++++++++ src/backend/parser/gram.y | 25 +- src/backend/replication/logical/conflict.c | 308 +++++++++++++++++++++++++ src/include/catalog/Makefile | 3 +- src/include/catalog/meson.build | 1 + src/include/catalog/pg_subscription_conflict.h | 55 +++++ src/include/nodes/parsenodes.h | 3 + src/include/parser/kwlist.h | 1 + src/include/replication/conflict.h | 49 ++++ src/test/regress/expected/oidjoins.out | 1 + src/test/regress/expected/subscription.out | 101 ++++++++ src/test/regress/sql/subscription.sql | 48 ++++ 12 files changed, 691 insertions(+), 4 deletions(-) create mode 100644 src/include/catalog/pg_subscription_conflict.h diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b2bc095..5c18c45 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -28,6 +28,7 @@ #include "catalog/pg_database_d.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "catalog/pg_subscription_conflict.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -37,6 +38,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/origin.h" @@ -453,6 +455,32 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, } /* + * Parsing function for conflict resolvers in CREATE SUBSCRIPTION command + */ +static void +parse_subscription_conflict_resolvers(List *stmtresolvers, + ConflictTypeResolver * resolvers) +{ + ListCell *lc; + + if (!stmtresolvers) + return; + + foreach(lc, stmtresolvers) + { + DefElem *defel = (DefElem *) lfirst(lc); + ConflictType type; + + /* validate the conflict type and resolver */ + type = validate_conflict_type_and_resolver(defel->defname, + defGetString(defel)); + + /* update the corresponding resolver for the given conflict type */ + resolvers[type].resolver = defGetString(defel); + } +} + +/* * Add publication names from the list to a string. */ static void @@ -596,6 +624,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bits32 supported_opts; SubOpts opts = {0}; AclResult aclresult; + ConflictTypeResolver conflictResolvers[CT_MAX + 1]; + bool skip_conflict_resolvers = false; /* * Parse and check options. @@ -611,6 +641,33 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + if (!opts.detectconflict) + { + /* + * If conflict resolvers are set but detect_conflict is not enabled, + * issue warning and ignore resolvers. + */ + if (stmt->resolvers) + ereport(WARNING, + (errmsg("Ignoring given CONFLICT RESOLVERS as detect_conflict is not enabled."))); + skip_conflict_resolvers = true; + } + else + { + /* If conflict resolvers are not set, use default values */ + if (!stmt->resolvers) + { + ereport(WARNING, + (errmsg("Will use default resolvers configuration as detect_conflict is ON but resolvers are not given"))); + } + + /* + * Parse and check conflict resolvers. Initialize with default values + */ + SetDefaultResolvers(conflictResolvers); + parse_subscription_conflict_resolvers(stmt->resolvers, conflictResolvers); + } + /* * Since creating a replication slot is not transactional, rolling back * the transaction leaves the created replication slot. So we cannot run @@ -742,6 +799,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); + /* Update the Conflict Resolvers in pg_subscription_conflict */ + if (!skip_conflict_resolvers) + SetSubConflictResolver(subid, conflictResolvers, CT_MAX + 1); + /* * Connect to remote side to execute requested commands and fetch table * info. @@ -1374,6 +1435,22 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_DETECT_CONFLICT)) { + ConflictTypeResolver conflictResolvers[CT_MAX + 1]; + + if (!opts.detectconflict) + RemoveSubscriptionConflictBySubid(subid); + else + { + /* if detect_conflict is already set, then ignore */ + if (!sub->detectconflict) + { + ereport(WARNING, + (errmsg("Using default conflict resolvers"))); + SetDefaultResolvers(conflictResolvers); + SetSubConflictResolver(subid, conflictResolvers, CT_MAX + 1); + } + } + values[Anum_pg_subscription_subdetectconflict - 1] = BoolGetDatum(opts.detectconflict); replaces[Anum_pg_subscription_subdetectconflict - 1] = true; @@ -1604,6 +1681,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, update_tuple = true; break; } + case ALTER_SUBSCRIPTION_CONFLICT_RESOLVERS: + { + List *conflict_resolvers = NIL; + + /* make sure that detect_conflict is enabled, else throw error */ + if (!sub->detectconflict) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set conflict resolvers when detect_conflict is not enabled"))); + + /* get list of conflict types and resolvers and validate them */ + conflict_resolvers = GetAndValidateSubsConflictResolverList(stmt->resolvers); + + /* + * Update the conflict resolvers for the corresponding + * conflict types in the pg_subscription_conflict catalog + */ + UpdateSubConflictResolvers(conflict_resolvers, subid); + break; + } default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", @@ -1855,6 +1952,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); + /* Remove any associated conflict resolvers */ + RemoveSubscriptionConflictBySubid(subid); + /* Remove the origin tracking if exists. */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a043fd4..a690b46 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -426,7 +426,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); OptTableElementList TableElementList OptInherit definition OptTypedTableElementList TypedTableElementList reloptions opt_reloptions - OptWith opt_definition func_args func_args_list + OptWith opt_definition opt_resolver_definition func_args func_args_list func_args_with_defaults func_args_with_defaults_list aggr_args aggr_args_list func_as createfunc_opt_list opt_createfunc_opt_list alterfunc_opt_list @@ -772,7 +772,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); RANGE READ REAL REASSIGN RECHECK RECURSIVE REF_P REFERENCES REFERENCING REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA - RESET RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP + RESET RESOLVER RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP ROUTINE ROUTINES ROW ROWS RULE SAVEPOINT SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT @@ -8824,6 +8824,11 @@ opt_definition: | /*EMPTY*/ { $$ = NIL; } ; +opt_resolver_definition: + CONFLICT RESOLVER definition { $$ = $3; } + | /*EMPTY*/ { $$ = NIL; } + ; + table_func_column: param_name func_type { FunctionParameter *n = makeNode(FunctionParameter); @@ -10758,7 +10763,7 @@ AlterPublicationStmt: *****************************************************************************/ CreateSubscriptionStmt: - CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION name_list opt_definition + CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION name_list opt_definition opt_resolver_definition { CreateSubscriptionStmt *n = makeNode(CreateSubscriptionStmt); @@ -10766,6 +10771,7 @@ CreateSubscriptionStmt: n->conninfo = $5; n->publication = $7; n->options = $8; + n->resolvers = $9; $$ = (Node *) n; } ; @@ -10872,6 +10878,17 @@ AlterSubscriptionStmt: n->options = $5; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name opt_resolver_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_CONFLICT_RESOLVERS; + n->subname = $3; + n->resolvers = $4; + $$ = (Node *) n; + } + ; /***************************************************************************** @@ -17797,6 +17814,7 @@ unreserved_keyword: | REPLACE | REPLICA | RESET + | RESOLVER | RESTART | RESTRICT | RETURN @@ -18428,6 +18446,7 @@ bare_label_keyword: | REPLACE | REPLICA | RESET + | RESOLVER | RESTART | RESTRICT | RETURN diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 4918011..720e923 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,9 +15,23 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heaptoast.h" +#include "access/heapam.h" +#include "access/table.h" +#include "access/tableam.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_conflict.h" +#include "catalog/pg_subscription_conflict_d.h" +#include "catalog/pg_inherits.h" +#include "commands/defrem.h" #include "replication/conflict.h" #include "replication/origin.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" #include "utils/rel.h" const char *const ConflictTypeNames[] = { @@ -29,6 +43,55 @@ const char *const ConflictTypeNames[] = { [CT_DELETE_DIFFER] = "delete_differ" }; +const char *const ConflictResolverNames[] = { + [CR_REMOTE_APPLY] = "remote_apply", + [CR_KEEP_LOCAL] = "keep_local", + [CR_APPLY_OR_SKIP] = "apply_or_skip", + [CR_APPLY_OR_ERROR] = "apply_or_error", + [CR_SKIP] = "skip", + [CR_ERROR] = "error" +}; + +#define CONFLICT_TYPE_MAX_RESOLVERS 4 + +/* + * Valid conflict resolvers for each conflict type. + * + * XXX: If we do not want to maintain different resolvers such as + * apply_or_skip and apply_or_error for update_missing conflict, + * then we can retain remote_apply and keep_local only. Then these + * resolvers in context of update_missing will mean: + * + * keep_local: do not apply the update as INSERT. + * remote_apply: apply the update as INSERT. If we could not apply, + * then log and skip. + * + * Similarly SKIP can be replaced with KEEP_LOCAL for both update_missing + * and delete_missing conflicts. For missing rows, 'SKIP' sounds more user + * friendly name for a resolver and thus has been added here. + */ +const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { + [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_UPDATE_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_UPDATE_MISSING] = {CR_APPLY_OR_SKIP, CR_APPLY_OR_ERROR, CR_SKIP, CR_ERROR}, + [CT_DELETE_MISSING] = {CR_SKIP, CR_ERROR}, + [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR} +}; + +/* + * Default conflict resolver for each conflict type. + */ +const int ConflictTypeDefaultResolvers[] = { + [CT_INSERT_EXISTS] = CR_REMOTE_APPLY, + [CT_UPDATE_EXISTS] = CR_REMOTE_APPLY, + [CT_UPDATE_DIFFER] = CR_REMOTE_APPLY, + [CT_UPDATE_MISSING] = CR_APPLY_OR_SKIP, + [CT_DELETE_MISSING] = CR_SKIP, + [CT_DELETE_DIFFER] = CR_REMOTE_APPLY + +}; + static char *build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot); static int errdetail_apply_conflict(ConflictType type, Oid conflictidx, TransactionId localxmin, @@ -191,3 +254,248 @@ build_index_value_desc(Oid indexoid, TupleTableSlot *conflictslot) return conflict_row; } + +/* + * Set default values for CONFLICT RESOLVERS for each conflict type + */ +void +SetDefaultResolvers(ConflictTypeResolver * conflictResolvers) +{ + ConflictType type; + + for (type = CT_MIN; type <= CT_MAX; type++) + { + conflictResolvers[type].conflict_type = ConflictTypeNames[type]; + conflictResolvers[type].resolver = + ConflictResolverNames[ConflictTypeDefaultResolvers[type]]; + } +} + +/* + * Validate the conflict type and resolver. + */ +ConflictType +validate_conflict_type_and_resolver(const char *conflict_type, + const char *conflict_resolver) +{ + ConflictType type; + ConflictResolver resolver; + bool valid = false; + int i; + + /* Check conflict type validity */ + for (type = CT_MIN; type <= CT_MAX; type++) + { + if (strcmp(ConflictTypeNames[type], conflict_type) == 0) + { + valid = true; + break; + } + } + + if (!valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%s is not a valid conflict type", conflict_type)); + + /* Reset */ + valid = false; + + /* Check conflict resolver validity. */ + for (resolver = CR_MIN; resolver <= CR_MAX; resolver++) + { + if (strcmp(ConflictResolverNames[resolver], conflict_resolver) == 0) + { + valid = true; + break; + } + } + + if (!valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%s is not a valid conflict resolver", conflict_resolver)); + + /* Reset */ + valid = false; + + /* Check if conflict resolver is a valid one for the given conflict type */ + for (i = 0; i < CONFLICT_TYPE_MAX_RESOLVERS; i++) + { + if (ConflictTypeResolverMap[type][i] == resolver) + { + valid = true; + break; + } + } + + if (!valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%s is not a valid conflict resolver for conflict type %s", + conflict_resolver, + conflict_type)); + return type; + +} + +/* + * Extract the conflict type and conflict resolvers from the + * ALTER SUBSCRIPTION command and return a list of ConflictTypeResolver nodes. + */ +List * +GetAndValidateSubsConflictResolverList(List *stmtresolvers) +{ + ListCell *lc; + ConflictTypeResolver *CTR = NULL; + List *res = NIL; + + foreach(lc, stmtresolvers) + { + DefElem *defel = (DefElem *) lfirst(lc); + + CTR = palloc(sizeof(ConflictTypeResolver)); + CTR->conflict_type = defel->defname; + CTR->resolver = defGetString(defel); + + /* + * Validate the conflict type and that the resolver is valid for that + * conflict type + */ + validate_conflict_type_and_resolver(CTR->conflict_type, CTR->resolver); + + res = lappend(res, CTR); + } + + return res; +} + +/* + * Update the Subscription's conflict resolver for a conflict type in + * pg_subscription_conflict system catalog + */ +void +UpdateSubConflictResolvers(List *conflict_resolvers, Oid subid) +{ + ListCell *lc; + Datum values[Natts_pg_subscription_conflict]; + bool nulls[Natts_pg_subscription_conflict]; + bool replaces[Natts_pg_subscription_conflict]; + HeapTuple oldtup; + HeapTuple newtup = NULL; + Relation pg_subscription_conflict; + + /* Prepare to update a tuple. */ + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + memset(values, 0, sizeof(values)); + + pg_subscription_conflict = table_open(SubscriptionConflictId, RowExclusiveLock); + + foreach(lc, conflict_resolvers) + { + ConflictTypeResolver *CTR = (ConflictTypeResolver *) lfirst(lc); + + /* set up subid and conflict_type to search in cache */ + values[Anum_pg_subscription_conflict_confsubid - 1] = ObjectIdGetDatum(subid); + values[Anum_pg_subscription_conflict_confrtype - 1] = CStringGetTextDatum(CTR->conflict_type); + + oldtup = SearchSysCache2(SUBSCRIPTIONCONFLICTSUBOID, + values[Anum_pg_subscription_conflict_confsubid - 1], + values[Anum_pg_subscription_conflict_confrtype - 1]); + + if (HeapTupleIsValid(oldtup)) + { + /* Update the new resolver */ + values[Anum_pg_subscription_conflict_confrres - 1] = CStringGetTextDatum(CTR->resolver); + replaces[Anum_pg_subscription_conflict_confrres - 1] = true; + + newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_subscription_conflict), + values, nulls, replaces); + CatalogTupleUpdate(pg_subscription_conflict, &oldtup->t_self, newtup); + ReleaseSysCache(oldtup); + heap_freetuple(newtup); + } + else + elog(ERROR, "cache lookup failed for table conflict %s for subid %u", + CTR->conflict_type, subid); + + } + + table_close(pg_subscription_conflict, RowExclusiveLock); +} + +/* + * Set Conflict Resolvers on the subscription + */ +void +SetSubConflictResolver(Oid subId, ConflictTypeResolver * resolvers, int resolvers_cnt) +{ + Relation pg_subscription_conflict; + Datum values[Natts_pg_subscription_conflict]; + bool nulls[Natts_pg_subscription_conflict]; + bool replaces[Natts_pg_subscription_conflict]; + HeapTuple newtup = NULL; + int type; + Oid conflict_oid; + + pg_subscription_conflict = table_open(SubscriptionConflictId, RowExclusiveLock); + + for (type = 0; type < resolvers_cnt; type++) + { + /* Prepare to update a tuple. */ + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + values[Anum_pg_subscription_conflict_confsubid - 1] = ObjectIdGetDatum(subId); + values[Anum_pg_subscription_conflict_confrtype - 1] = + CStringGetTextDatum(resolvers[type].conflict_type); + values[Anum_pg_subscription_conflict_confrres - 1] = + CStringGetTextDatum(resolvers[type].resolver); + + /* Get a new oid and update the tuple into catalog */ + conflict_oid = GetNewOidWithIndex(pg_subscription_conflict, SubscriptionConflictOidIndexId, + Anum_pg_subscription_conflict_oid); + values[Anum_pg_subscription_conflict_oid - 1] = ObjectIdGetDatum(conflict_oid); + newtup = heap_form_tuple(RelationGetDescr(pg_subscription_conflict), + values, nulls); + CatalogTupleInsert(pg_subscription_conflict, newtup); + heap_freetuple(newtup); + } + + table_close(pg_subscription_conflict, RowExclusiveLock); +} + +/* + * Remove the subscription conflict resolvers for the subscription id + */ +void +RemoveSubscriptionConflictBySubid(Oid subid) +{ + Relation rel; + HeapTuple tup; + TableScanDesc scan; + ScanKeyData skey[1]; + int nkeys = 0; + + rel = table_open(SubscriptionConflictId, RowExclusiveLock); + + /* + * Search using the subid, this should return all conflict resolvers for + * this sub + */ + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_conflict_confsubid, + BTEqualStrategyNumber, + F_OIDEQ, + ObjectIdGetDatum(subid)); + + scan = table_beginscan_catalog(rel, nkeys, skey); + + /* Iterate through the tuples and delete them */ + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + CatalogTupleDelete(rel, &tup->t_self); + + table_endscan(scan); + table_close(rel, RowExclusiveLock); +} diff --git a/src/include/catalog/Makefile b/src/include/catalog/Makefile index 167f91a..f2611c1 100644 --- a/src/include/catalog/Makefile +++ b/src/include/catalog/Makefile @@ -81,7 +81,8 @@ CATALOG_HEADERS := \ pg_publication_namespace.h \ pg_publication_rel.h \ pg_subscription.h \ - pg_subscription_rel.h + pg_subscription_rel.h \ + pg_subscription_conflict.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build index f70d1da..959e1d9 100644 --- a/src/include/catalog/meson.build +++ b/src/include/catalog/meson.build @@ -69,6 +69,7 @@ catalog_headers = [ 'pg_publication_rel.h', 'pg_subscription.h', 'pg_subscription_rel.h', + 'pg_subscription_conflict.h', ] # The .dat files we need can just be listed alphabetically. diff --git a/src/include/catalog/pg_subscription_conflict.h b/src/include/catalog/pg_subscription_conflict.h new file mode 100644 index 0000000..c8b37c2 --- /dev/null +++ b/src/include/catalog/pg_subscription_conflict.h @@ -0,0 +1,55 @@ +/* ------------------------------------------------------------------------- + * + * pg_subscription_conflict.h + * definition of the "subscription conflict resolver" system + * catalog (pg_subscription_conflict) + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_subscription_conflict.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + * ------------------------------------------------------------------------- + */ +#ifndef PG_SUBSCRIPTION_CONFLICT_H +#define PG_SUBSCRIPTION_CONFLICT_H + +#include "catalog/genbki.h" +#include "catalog/pg_subscription_conflict_d.h" + +/* ---------------- + * pg_subscription_conflict definition. cpp turns this into + * typedef struct FormData_pg_subscription_conflict + * ---------------- + */ +CATALOG(pg_subscription_conflict,8881,SubscriptionConflictId) +{ + Oid oid; /* OID of the object itself */ + Oid confsubid BKI_LOOKUP(pg_subscription); /* OID of subscription */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text confrtype BKI_FORCE_NOT_NULL; /* conflict type */ + text confrres BKI_FORCE_NOT_NULL; /* conflict resolver */ +#endif +} FormData_pg_subscription_conflict; + +/* ---------------- + * Form_pg_subscription_conflict corresponds to a pointer to a row with + * the format of pg_subscription_conflict relation. + * ---------------- + */ +typedef FormData_pg_subscription_conflict * Form_pg_subscription_conflict; + +DECLARE_TOAST(pg_subscription_conflict, 8882, 8883); + +DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_conflict_oid_index, 8884, SubscriptionConflictOidIndexId, pg_subscription_conflict, btree(oid oid_ops)); + +DECLARE_UNIQUE_INDEX(pg_subscription_conflict_sub_index, 8885, SubscriptionConflictSubIndexId, pg_subscription_conflict, btree(confsubid oid_ops, confrtype text_ops)); + +MAKE_SYSCACHE(SUBSCRIPTIONCONFLICTSUBOID, pg_subscription_conflict_sub_index, 256); + +#endif /* PG_SUBSCRIPTION_CONFLICT_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 85a62b5..2e1eefc3 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4203,6 +4203,7 @@ typedef struct CreateSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + List *resolvers; /* List of conflict resolvers */ } CreateSubscriptionStmt; typedef enum AlterSubscriptionType @@ -4215,6 +4216,7 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, + ALTER_SUBSCRIPTION_CONFLICT_RESOLVERS, } AlterSubscriptionType; typedef struct AlterSubscriptionStmt @@ -4225,6 +4227,7 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + List *resolvers; /* List of conflict resolvers */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index f7fe834..21e5cb1 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -377,6 +377,7 @@ PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("resolver", RESOLVER, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("restart", RESTART, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("restrict", RESTRICT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("return", RETURN, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 3a7260d..c102f74 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -39,6 +39,47 @@ typedef enum CT_DELETE_DIFFER, } ConflictType; +/* Min and max conflict type */ +#define CT_MIN CT_INSERT_EXISTS +#define CT_MAX CT_DELETE_DIFFER + +/* + * Conflict resolvers that can be used to resolve various conflicts. + * + * See ConflictTypeResolverMap in conflcit.c to find out which all + * resolvers are supported for each conflict type. + */ +typedef enum ConflictResolver +{ + /* Apply the remote change */ + CR_REMOTE_APPLY = 1, + + /* Keep the local change */ + CR_KEEP_LOCAL, + + /* Apply the remote change; skip if it can not be applied */ + CR_APPLY_OR_SKIP, + + /* Apply the remote change; emit error if it can not be applied */ + CR_APPLY_OR_ERROR, + + /* Skip applying the change */ + CR_SKIP, + + /* Error out */ + CR_ERROR, +} ConflictResolver; + +/* Min and max conflict resolver */ +#define CR_MIN CR_REMOTE_APPLY +#define CR_MAX CR_ERROR + +typedef struct ConflictTypeResolver +{ + const char *conflict_type; + const char *resolver; +} ConflictTypeResolver; + extern bool GetTupleCommitTs(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts); extern void ReportApplyConflict(int elevel, ConflictType type, @@ -46,5 +87,13 @@ extern void ReportApplyConflict(int elevel, ConflictType type, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts, TupleTableSlot *conflictslot); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern void SetSubConflictResolver(Oid subId, ConflictTypeResolver * resolvers, int max_types); +extern void RemoveSubscriptionConflictById(Oid confid); +extern void RemoveSubscriptionConflictBySubid(Oid confid); +extern List *GetAndValidateSubsConflictResolverList(List *stmtresolvers); +extern void UpdateSubConflictResolvers(List *conflict_resolvers, Oid subid); +extern ConflictType validate_conflict_type_and_resolver(const char *conflict_type, + const char *conflict_resolver); +extern void SetDefaultResolvers(ConflictTypeResolver * conflictResolvers); #endif diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb89..42bf2cc 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -266,3 +266,4 @@ NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid} +NOTICE: checking pg_subscription_conflict {confsubid} => pg_subscription {oid} diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index cc9337c..7bd4e82 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -430,6 +430,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ERROR: detect_conflict requires a Boolean value -- now it works CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: Will use default resolvers configuration as detect_conflict is ON but resolvers are not given WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ @@ -439,6 +440,18 @@ HINT: To initiate replication, you must manually create the replication slot, e regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 (1 row) +-- confirm that the default conflict resolvers have been set +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +----------------+--------------- + delete_differ | remote_apply + delete_missing | skip + insert_exists | remote_apply + update_differ | remote_apply + update_exists | remote_apply + update_missing | apply_or_skip +(6 rows) + ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); \dRs+ List of subscriptions @@ -447,6 +460,94 @@ ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) +-- confirm that the conflict resolvers have been dropped +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +-----------+---------- +(0 rows) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - invalid conflict resolvers +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (insert_exists = foo); +ERROR: foo is not a valid conflict resolver +-- fail - invalid conflict types +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (foo = 'keep_local'); +ERROR: foo is not a valid conflict type +-- creating subscription with detect_conflict = false should not create conflict resolvers +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +-----------+---------- +(0 rows) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +--try setting resolvers for few types +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (insert_exists = 'keep_local', update_missing = 'skip', delete_differ = 'keep_local' ); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. + +--check if above are configured; for non specified conflict types, default resolvers should be seen +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +----------------+-------------- + delete_differ | keep_local + delete_missing | skip + insert_exists | keep_local + update_differ | remote_apply + update_exists | remote_apply + update_missing | skip +(6 rows) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- creating subscription with detect_conflict = false but conflict resolvers specified +-- will result in conflict resolvers being ignored +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = false) CONFLICT RESOLVER (insert_exists = 'keep_local'); +WARNING: Ignoring given CONFLICT RESOLVERS as detect_conflict is not enabled. +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +-----------+---------- +(0 rows) + +-- setting detect_conflict to true will set default conflict resolvers +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = true); +WARNING: Using default conflict resolvers +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +----------------+--------------- + delete_differ | remote_apply + delete_missing | skip + insert_exists | remote_apply + update_differ | remote_apply + update_exists | remote_apply + update_missing | apply_or_skip +(6 rows) + +-- fail - altering with invalid conflict type +ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (foo = 'keep_local'); +ERROR: foo is not a valid conflict type +-- fail - altering with invalid conflict resolver +ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (insert_exists = 'foo'); +ERROR: foo is not a valid conflict resolver +-- ok - valid conflict type and resolver +ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (insert_exists = 'keep_local', update_missing = 'skip', delete_differ = 'keep_local' ); +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + confrtype | confrres +----------------+-------------- + delete_differ | keep_local + delete_missing | skip + insert_exists | keep_local + update_differ | remote_apply + update_exists | remote_apply + update_missing | skip +(6 rows) + ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; -- let's do some tests with pg_create_subscription rather than superuser diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 5c740fd..bdd2118 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -295,10 +295,58 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB \dRs+ +-- confirm that the default conflict resolvers have been set +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false); \dRs+ +-- confirm that the conflict resolvers have been dropped +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + +-- fail - invalid conflict resolvers +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (insert_exists = foo); + +-- fail - invalid conflict types +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (foo = 'keep_local'); + +-- creating subscription with detect_conflict = false should not create conflict resolvers +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = false); +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + +--try setting resolvers for few types +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true) CONFLICT RESOLVER (insert_exists = 'keep_local', update_missing = 'skip', delete_differ = 'keep_local' ); + +--check if above are configured; for non specified conflict types, default resolvers should be seen +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + +-- creating subscription with detect_conflict = false but conflict resolvers specified +-- will result in conflict resolvers being ignored +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = false) CONFLICT RESOLVER (insert_exists = 'keep_local'); +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + +-- setting detect_conflict to true will set default conflict resolvers +ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = true); +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + +-- fail - altering with invalid conflict type +ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (foo = 'keep_local'); + +-- fail - altering with invalid conflict resolver +ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (insert_exists = 'foo'); + +-- ok - valid conflict type and resolver +ALTER SUBSCRIPTION regress_testsub CONFLICT RESOLVER (insert_exists = 'keep_local', update_missing = 'skip', delete_differ = 'keep_local' ); +SELECT confrtype, confrres FROM pg_subscription_conflict ORDER BY confrtype; + ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; -- 1.8.3.1