From 1cfffb1590e0efe7c7bc09315318fe7990ef6f54 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 23 Jul 2025 11:34:50 +0530 Subject: [PATCH v20250724 4/6] Introduce "REFRESH PUBLICATION SEQUENCES" for subscriptions This patch introduce a new command to synchronize the sequences of a subscription: ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES Author: Vignesh C, Tomas Vondra Reviewer: Amit Kapila, Shveta Malik, Dilip Kumar, Peter Smith, Nisha Moond Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com --- src/backend/catalog/pg_publication.c | 82 +++++ src/backend/catalog/pg_subscription.c | 61 ++- src/backend/catalog/system_views.sql | 10 + src/backend/commands/subscriptioncmds.c | 387 +++++++++++++++----- src/backend/executor/execReplication.c | 4 +- src/backend/parser/gram.y | 11 +- src/backend/replication/logical/syncutils.c | 5 +- src/bin/psql/tab-complete.in.c | 2 +- src/include/catalog/pg_proc.dat | 5 + src/include/catalog/pg_publication.h | 1 + src/include/catalog/pg_subscription_rel.h | 4 +- src/include/nodes/parsenodes.h | 3 +- src/test/regress/expected/rules.out | 11 +- src/test/regress/expected/subscription.out | 4 +- 14 files changed, 472 insertions(+), 118 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index d7d33c8b709..f4b5b7915bb 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1065,6 +1065,42 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets list of all relations published by FOR ALL SEQUENCES publication(s). + */ +List * +GetAllSequencesPublicationRelations(void) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_SEQUENCE)); + + scan = table_beginscan_catalog(classRel, 1, key); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + table_close(classRel, AccessShareLock); + return result; +} + /* * Get publication using oid * @@ -1337,3 +1373,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + List *sequences = NIL; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index b6ba367b877..6cef3b9c27e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -27,6 +27,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/memutils.h" #include "utils/lsyscache.h" #include "utils/pg_lsn.h" #include "utils/rel.h" @@ -463,7 +464,9 @@ RemoveSubscriptionRel(Oid subid, Oid relid) * leave tablesync slots or origins in the system when the * corresponding table is dropped. */ - if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY) + if (!OidIsValid(subid) && + get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE && + subrel->srsubstate != SUBREL_STATE_READY) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -500,7 +503,8 @@ HasSubscriptionTables(Oid subid) Relation rel; ScanKeyData skey[1]; SysScanDesc scan; - bool has_subrels; + HeapTuple tup; + bool has_subrels = false; rel = table_open(SubscriptionRelRelationId, AccessShareLock); @@ -512,8 +516,22 @@ HasSubscriptionTables(Oid subid) scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, skey); - /* If even a single tuple exists then the subscription has tables. */ - has_subrels = HeapTupleIsValid(systable_getnext(scan)); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + /* + * Skip sequence tuples. If even a single table tuple exists then the + * subscription has tables. + */ + if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE) + { + has_subrels = true; + break; + } + } /* Cleanup */ systable_endscan(scan); @@ -525,12 +543,22 @@ HasSubscriptionTables(Oid subid) /* * Get the relations for the subscription. * - * If not_ready is true, return only the relations that are not in a ready - * state, otherwise return all the relations of the subscription. The - * returned list is palloc'ed in the current memory context. + * get_tables: get relations for tables of the subscription. + * + * get_sequences: get relations for sequences of the subscription. + * + * not_ready: + * If getting tables and not_ready is false get all tables, otherwise, + * only get tables that have not reached READY state. + * If getting sequences and not_ready is false get all sequences, + * otherwise, only get sequences that have not reached READY state (i.e. are + * still in INIT state). + * + * The returned list is palloc'ed in the current memory context. */ List * -GetSubscriptionRelations(Oid subid, bool not_ready) +GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, + bool not_ready) { List *res = NIL; Relation rel; @@ -539,6 +567,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready) ScanKeyData skey[2]; SysScanDesc scan; + /* One or both of 'get_tables' and 'get_sequences' must be true. */ + Assert(get_tables || get_sequences); + rel = table_open(SubscriptionRelRelationId, AccessShareLock); ScanKeyInit(&skey[nkeys++], @@ -561,9 +592,23 @@ GetSubscriptionRelations(Oid subid, bool not_ready) SubscriptionRelState *relstate; Datum d; bool isnull; + bool issequence; + bool istable; subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + /* Relation is either a sequence or a table */ + issequence = get_rel_relkind(subrel->srrelid) == RELKIND_SEQUENCE; + istable = !issequence; + + /* Skip sequences if they were not requested */ + if (!get_sequences && issequence) + continue; + + /* Skip tables if they were not requested */ + if (!get_tables && istable) + continue; + relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f6eca09ee15..a0b1a0ef56f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPS, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPS.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index cd6c3684482..5e412df02f6 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -27,6 +27,7 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_sequence.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" @@ -106,6 +107,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, @@ -715,6 +717,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + /* + * XXX: If the subscription is for a sequence-only publication, creating a + * replication origin is unnecessary because incremental synchronization + * of sequences is not supported, and sequence data is fully synced during + * a REFRESH, which does not rely on the origin. If the publication is + * later modified to include tables, the origin can be created during the + * ALTER SUBSCRIPTION ... REFRESH command. + */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -726,9 +736,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; bool must_use_password; /* Try to connect to the publisher. */ @@ -743,6 +750,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool has_tables; + List *relations; + char table_state; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.retaindeadtuples, opts.origin, @@ -758,13 +769,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Build local relation status info. Relations are for both tables + * and sequences from the publisher. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + relations = fetch_table_list(wrconn, publications); + has_tables = relations != NIL; + relations = list_concat(relations, + fetch_sequence_list(wrconn, publications)); + + foreach_ptr(RangeVar, rv, relations) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); @@ -781,6 +795,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to * export it. + * + * XXX: If the subscription is for a sequence-only publication, + * creating this slot is unnecessary. It can be created later + * during the ALTER SUBSCRIPTION ... REFRESH PUBLICATION or ALTER + * SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES command, if the + * publication is updated to include tables. */ if (opts.create_slot) { @@ -804,7 +824,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && has_tables) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -843,18 +863,55 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, return myself; } +/* + * Update the subscription to refresh both the publication and the publication + * objects associated with the subscription. + * + * Parameters: + * + * If 'copy_data' is true, the function will set the state to INIT; otherwise, + * it will set the state to READY. + * + * If 'validate_publications' is provided with a publication list, the + * function checks that the specified publications exist on the publisher. + * + * If 'refresh_tables' is true, update the subscription by adding or removing + * tables that have been added or removed since the last subscription creation + * or refresh publication. + * + * If 'refresh_sequences' is true, update the subscription by adding or removing + * sequences that have been added or removed since the last subscription + * creation or refresh publication. + * + * Note, this is a common function for handling different REFRESH commands + * according to the parameter 'resync_all_sequences' + * + * 1. ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES + * (when parameter resync_all_sequences is true) + * + * The function will mark all sequences with INIT state. + * Assert copy_data is true. + * Assert refresh_tables is false. + * Assert refresh_sequences is true. + * + * 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION [WITH (copy_data=true|false)] + * (when parameter resync_all_sequences is false) + * + * The function will update only the newly added tables and/or sequences + * based on the copy_data parameter. + */ static void AlterSubscription_refresh(Subscription *sub, bool copy_data, - List *validate_publications) + List *validate_publications, bool refresh_tables, + bool refresh_sequences, bool resync_all_sequences) { char *err; - List *pubrel_names; + List *pubrel_names = NIL; List *subrel_states; Oid *subrel_local_oids; Oid *pubrel_local_oids; ListCell *lc; int off; - int remove_rel_len; int subrel_count; Relation rel = NULL; typedef struct SubRemoveRels @@ -862,10 +919,17 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, Oid relid; char state; } SubRemoveRels; - SubRemoveRels *sub_remove_rels; + + List *sub_remove_rels = NIL; WalReceiverConn *wrconn; bool must_use_password; +#ifdef USE_ASSERT_CHECKING + /* Sanity checks for parameter values */ + if (resync_all_sequences) + Assert(copy_data && !refresh_tables && refresh_sequences); +#endif + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -885,16 +949,23 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, check_publications(wrconn, validate_publications); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + if (refresh_tables) + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* Get the sequence list from publisher. */ + if (refresh_sequences) + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, + sub->publications)); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false); + /* Get local relation list. */ + subrel_states = GetSubscriptionRelations(sub->oid, refresh_tables, refresh_sequences, false); subrel_count = list_length(subrel_states); /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup - * is important. + * Build qsorted array of local relation oids for faster lookup. This + * can potentially contain all relation in the database so speed of + * lookup is important. */ subrel_local_oids = palloc(subrel_count * sizeof(Oid)); off = 0; @@ -907,22 +978,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); - check_publications_origin(wrconn, sub->publications, copy_data, - sub->retaindeadtuples, sub->origin, - subrel_local_oids, subrel_count, sub->name); - - /* - * Rels that we want to remove from subscription and drop any slots - * and origins corresponding to them. - */ - sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); + if (refresh_tables) + check_publications_origin(wrconn, sub->publications, copy_data, + sub->retaindeadtuples, sub->origin, + subrel_local_oids, subrel_count, + sub->name); /* - * Walk over the remote tables and try to match them to locally known - * tables. If the table is not known locally create a new state for - * it. + * Walk over the remote relations and try to match them to locally + * known tables. If the table is not known locally create a new state + * for it. * - * Also builds array of local oids of remote tables for the next step. + * Also builds array of local oids of remote relations for the next + * step. */ off = 0; pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); @@ -931,12 +999,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; + char relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + relkind = get_rel_relkind(relid); + CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname); pubrel_local_oids[off++] = relid; @@ -947,28 +1016,48 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr, true); ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + rv->schemaname, rv->relname, sub->name)); } } /* - * Next remove state for tables we should not care about anymore using - * the data we collected above + * Next remove state for relations we should not care about anymore + * using the data we collected above */ qsort(pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp); - remove_rel_len = 0; for (off = 0; off < subrel_count; off++) { Oid relid = subrel_local_oids[off]; - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + if (bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + /* + * The resync_all_sequences flag will only be set to true for + * the REFRESH PUBLICATION SEQUENCES command, indicating that + * the existing sequences need to be re-synchronized by + * resetting the relation to its initial state. + */ + if (resync_all_sequences) + { + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } + else { char state; XLogRecPtr statelsn; + char relkind = get_rel_relkind(relid); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -990,41 +1079,55 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Last known rel state. */ state = GetSubscriptionRelState(sub->oid, relid, &statelsn); - sub_remove_rels[remove_rel_len].relid = relid; - sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); - /* - * For READY state, we would have already dropped the - * tablesync origin. + * A single sequencesync worker synchronizes all sequences, so + * only stop workers when relation kind is not sequence. */ - if (state != SUBREL_STATE_READY) + if (relkind != RELKIND_SEQUENCE) { - char originname[NAMEDATALEN]; + SubRemoveRels *rel = palloc(sizeof(SubRemoveRels)); + + rel->relid = relid; + rel->state = state; + + sub_remove_rels = lappend(sub_remove_rels, rel); + + logicalrep_worker_stop(sub->oid, relid); /* - * Drop the tablesync's origin tracking if exists. - * - * It is possible that the origin is not yet created for - * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or - * apply worker can also concurrently try to drop the - * origin and by this time the origin might be already - * removed. For these reasons, passing missing_ok = true. + * For READY state, we would have already dropped the + * tablesync origin. */ - ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + if (state != SUBREL_STATE_READY) + { + char originname[NAMEDATALEN]; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created + * for tablesync worker, this can happen for the + * states before SUBREL_STATE_FINISHEDCOPY. The + * tablesync worker or apply worker can also + * concurrently try to drop the origin and by this + * time the origin might be already removed. For these + * reasons, passing missing_ok = true. + */ + ReplicationOriginNameForLogicalRep(sub->oid, relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } } ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name))); + errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); } } @@ -1033,10 +1136,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * to be at the end because otherwise if there is an error while doing * the database operations we won't be able to rollback dropped slots. */ - for (off = 0; off < remove_rel_len; off++) + foreach_ptr(SubRemoveRels, rel, sub_remove_rels) { - if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_SYNCDONE) { char syncslotname[NAMEDATALEN] = {0}; @@ -1050,11 +1153,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * dropped slots and fail. For these reasons, we allow * missing_ok = true for the drop. */ - ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, + ReplicationSlotNameForTablesync(sub->oid, rel->relid, syncslotname, sizeof(syncslotname)); ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } + + list_free_deep(sub_remove_rels); } PG_FINALLY(); { @@ -1538,8 +1643,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1553,7 +1658,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = stmt->publication; AlterSubscription_refresh(sub, opts.copy_data, - stmt->publication); + stmt->publication, true, true, + false); } break; @@ -1593,8 +1699,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)"))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1612,18 +1718,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = publist; AlterSubscription_refresh(sub, opts.copy_data, - validate_publications); + validate_publications, true, true, + false); } break; } - case ALTER_SUBSCRIPTION_REFRESH: + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION: { if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, SUBOPT_COPY_DATA, &opts); @@ -1635,8 +1742,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * * But, having reached this two-phase commit "enabled" state * we must not allow any subsequent table initialization to - * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed - * when the user had requested two_phase = on mode. + * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is + * disallowed when the user had requested two_phase = on mode. * * The exception to this restriction is when copy_data = * false, because when copy_data is false the tablesync will @@ -1648,12 +1755,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), - errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION"); + + AlterSubscription_refresh(sub, opts.copy_data, NULL, true, true, false); + + break; + } + + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions")); - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); - AlterSubscription_refresh(sub, opts.copy_data, NULL); + AlterSubscription_refresh(sub, true, NULL, false, true, true); break; } @@ -1931,7 +2052,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionRelations(subid, true); + rstates = GetSubscriptionRelations(subid, true, false, true); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2246,16 +2367,16 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * it's a partitioned table), from some other publishers. This check is * required in the following scenarios: * - * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements - * with "copy_data = true" and "origin = none": + * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * statements with "copy_data = true" and "origin = none": * - Warn the user that data with an origin might have been copied. * - This check is skipped for tables already added, as incremental sync via * WAL allows origin tracking. The list of such tables is in * subrel_local_oids. * - * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements - * with "retain_dead_tuples = true" and "origin = any", and for ALTER - * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or + * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * statements with "retain_dead_tuples = true" and "origin = any", and for + * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin, or * when the publisher's status changes (e.g., due to a connection string * update): * - Warn the user that only conflict detection info for local changes on @@ -2314,24 +2435,28 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, appendStringInfoString(&cmd, ")\n"); /* - * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains - * the list of relation oids that are already present on the subscriber. - * This check should be skipped for these tables if checking for table - * sync scenario. However, when handling the retain_dead_tuples scenario, - * ensure all tables are checked, as some existing tables may now include - * changes from other origins due to newly created subscriptions on the - * publisher. + * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, subrel_local_oids + * contains the list of relation oids that are already present on the + * subscriber. This check should be skipped for these tables if checking + * for table sync scenario. However, when handling the retain_dead_tuples + * scenario, ensure all tables are checked, as some existing tables may now + * include changes from other origins due to newly created subscriptions on + * the publisher. */ if (check_table_sync) { for (i = 0; i < subrel_count; i++) { Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + if (get_rel_relkind(relid) != RELKIND_SEQUENCE) + { + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } } @@ -2611,6 +2736,68 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } +/* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + List *seqlist = NIL; + int server_version = walrcv_server_version(wrconn); + + /* Skip sequence fetch if the publisher is older than version 19 */ + if (server_version < 190000) + return seqlist; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + + appendStringInfoString(&cmd, + "SELECT DISTINCT s.schemaname, s.sequencename\n" + "FROM pg_catalog.pg_publication_sequences s\n" + "WHERE s.pubname IN ("); + GetPublicationsStr(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not receive list of sequences from the publisher: %s", + res->err)); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + seqlist = lappend(seqlist, rv); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + return seqlist; +} + /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index f262e7a66f7..b58e81424ab 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -877,7 +877,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 9888e33d8df..25cf78e0e30 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10958,11 +10958,20 @@ AlterSubscriptionStmt: AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_REFRESH; + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION; n->subname = $3; n->options = $6; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES; + n->subname = $3; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index 5109b197805..45b6d429558 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -152,8 +152,9 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); + /* Fetch tables and sequences that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, + true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 69a91826254..f452ea1542a 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2309,7 +2309,7 @@ match_previous_words(int pattern_id, "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("SEQUENCES", "WITH ("); /* ALTER SUBSCRIPTION REFRESH PUBLICATION WITH ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "(")) COMPLETE_WITH("copy_data"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9cc52a7c83f..3fe60ae82cd 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12282,6 +12282,11 @@ proargmodes => '{v,o,o,o,o}', proargnames => '{pubname,pubid,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8052', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 24e09c76649..1af265aa174 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -171,6 +171,7 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllSequencesPublicationRelations(void); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index ea869588d84..a541f4843bd 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -90,6 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionTables(Oid subid); -extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern List *GetSubscriptionRelations(Oid subid, bool get_tables, + bool get_sequences, + bool not_ready); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 73e505c25b3..c2e9583cdb7 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4359,7 +4359,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, - ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, } AlterSubscriptionType; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index dce8c672b40..8b2c407ccdb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1462,6 +1462,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_sequences| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS sequencename + FROM pg_publication p, + LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid), + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid = gps.relid); pg_publication_tables| SELECT p.pubname, n.nspname AS schemaname, c.relname AS tablename, @@ -2175,6 +2183,7 @@ pg_stat_subscription| SELECT su.oid AS subid, pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, + ss.sequence_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, @@ -2185,7 +2194,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sequence_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a98c97f7616..629e2617f63 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -107,7 +107,7 @@ HINT: To initiate replication, you must manually create the replication slot, e ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; -ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions -- fail - origin must be either none or any CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); ERROR: unrecognized origin value: "foo" @@ -352,7 +352,7 @@ ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block END; BEGIN; ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION; -ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION cannot run inside a transaction block END; CREATE FUNCTION func() RETURNS VOID AS $$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL; -- 2.43.0