From 173e74279c22193346b91226dce9d6a8273b9f60 Mon Sep 17 00:00:00 2001 From: Jacob Champion Date: Fri, 7 Apr 2023 09:55:27 -0700 Subject: [PATCH v4 1/2] pgoutput: refactor publication cache construction Breaking this logic into a standalone helper should help expose the behavior for testing. Additionally, move the implementation under the pg_publication catalog helpers, since it seems like it's inherent to the publication settings (and it must match the behavior of the initial sync, which doesn't seem to be controlled by the output plugin at all). --- src/backend/catalog/pg_publication.c | 215 ++++++++++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 137 +------------ src/include/catalog/pg_proc.dat | 8 + src/include/catalog/pg_publication.h | 4 + src/test/regress/expected/publication.out | 26 +++ src/test/regress/sql/publication.sql | 15 ++ 6 files changed, 272 insertions(+), 133 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index c488b6370b..df9abf09c3 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1259,3 +1259,218 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +List * +process_relation_publications(Oid relid, const List *publications, + PublicationActions *pubactions, + Oid *publish_as_relid) +{ + Oid schemaId = get_rel_namespace(relid); + List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); + ListCell *lc; + int publish_ancestor_level = 0; + bool am_partition = get_rel_relispartition(relid); + char relkind = get_rel_relkind(relid); + List *rel_publications = NIL; + + *publish_as_relid = relid; + + foreach(lc, publications) + { + Publication *pub = lfirst(lc); + bool publish = false; + + /* + * Under what relid should we publish changes in this publication? + * We'll use the top-most relid across all publications. Also + * track the ancestor level for this publication. + */ + Oid pub_relid = relid; + int ancestor_level = 0; + + /* + * If this is a FOR ALL TABLES publication, pick the partition + * root and set the ancestor level accordingly. + */ + if (pub->alltables) + { + publish = true; + if (pub->pubviaroot && am_partition) + { + List *ancestors = get_partition_ancestors(relid); + + pub_relid = llast_oid(ancestors); + ancestor_level = list_length(ancestors); + } + } + + if (!publish) + { + bool ancestor_published = false; + + /* + * For a partition, check if any of the ancestors are + * published. If so, note down the topmost ancestor that is + * published via this publication, which will be used as the + * relation via which to publish the partition's changes. + */ + if (am_partition) + { + Oid ancestor; + int level; + List *ancestors = get_partition_ancestors(relid); + + ancestor = GetTopMostAncestorInPublication(pub->oid, + ancestors, + &level); + + if (ancestor != InvalidOid) + { + ancestor_published = true; + if (pub->pubviaroot) + { + pub_relid = ancestor; + ancestor_level = level; + } + } + } + + if (list_member_oid(pubids, pub->oid) || + list_member_oid(schemaPubids, pub->oid) || + ancestor_published) + publish = true; + } + + /* + * If the relation is to be published, determine actions to + * publish, and list of columns, if appropriate. + * + * Don't publish changes for partitioned tables, because + * publishing those of its partitions suffices, unless partition + * changes won't be published due to pubviaroot being set. + */ + if (publish && + (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) + { + pubactions->pubinsert |= pub->pubactions.pubinsert; + pubactions->pubupdate |= pub->pubactions.pubupdate; + pubactions->pubdelete |= pub->pubactions.pubdelete; + pubactions->pubtruncate |= pub->pubactions.pubtruncate; + + /* + * We want to publish the changes as the top-most ancestor + * across all publications. So we need to check if the already + * calculated level is higher than the new one. If yes, we can + * ignore the new value (as it's a child). Otherwise the new + * value is an ancestor, so we keep it. + */ + if (publish_ancestor_level > ancestor_level) + continue; + + /* + * If we found an ancestor higher up in the tree, discard the + * list of publications through which we replicate it, and use + * the new ancestor. + */ + if (publish_ancestor_level < ancestor_level) + { + *publish_as_relid = pub_relid; + publish_ancestor_level = ancestor_level; + + /* reset the publication list for this relation */ + rel_publications = NIL; + } + else + { + /* Same ancestor level, has to be the same OID. */ + Assert(*publish_as_relid == pub_relid); + } + + /* Track publications for this ancestor. */ + rel_publications = lappend(rel_publications, pub); + } + } + + list_free(pubids); + list_free(schemaPubids); + + return rel_publications; +} + +Datum +pg_get_relation_publishing_info(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + List *publications = NIL; + ArrayType *arr; + Datum *elems; + int nelems, + i; + TupleDesc tupdesc; + HeapTuple htup; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + arr = PG_GETARG_ARRAYTYPE_P(1); + deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, &elems, NULL, + &nelems); + + /* Get Oids of tables from each publication. */ + for (i = 0; i < nelems; i++) + { + char *pubname = TextDatumGetCString(elems[i]); + Publication *pub = GetPublicationByName(pubname, false); + + publications = lappend(publications, pub); + } + + { + List *rel_publications; + PublicationActions pubactions; + Oid publish_as_relid; + ListCell *lc; + Datum *puboids; + ArrayType *puboidarray; + Datum values[6]; + bool nulls[6]; + + rel_publications = + process_relation_publications(relid, publications, &pubactions, + &publish_as_relid); + + /* Translate the rel_publications List into an OID array. */ + puboids = palloc(sizeof(Datum) * list_length(rel_publications)); + + foreach(lc, rel_publications) + { + Publication *pub = lfirst(lc); + + i = foreach_current_index(lc); + puboids[i] = ObjectIdGetDatum(pub->oid); + } + + puboidarray = construct_array(puboids, list_length(rel_publications), + OIDOID, sizeof(Oid), true, TYPALIGN_INT); + + values[0] = PointerGetDatum(puboidarray); + values[1] = ObjectIdGetDatum(publish_as_relid); + values[2] = BoolGetDatum(pubactions.pubinsert); + values[3] = BoolGetDatum(pubactions.pubupdate); + values[4] = BoolGetDatum(pubactions.pubdelete); + values[5] = BoolGetDatum(pubactions.pubtruncate); + + memset(nulls, false, sizeof(nulls)); + + htup = heap_form_tuple(tupdesc, values, nulls); + } + + PG_RETURN_DATUM(HeapTupleGetDatum(htup)); +} diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index b08ca55041..640676e7dc 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1986,20 +1986,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* Validate the entry */ if (!entry->replicate_valid) { - Oid schemaId = get_rel_namespace(relid); - List *pubids = GetRelationPublications(relid); - - /* - * We don't acquire a lock on the namespace system table as we build - * the cache entry using a historic snapshot and all the later changes - * are absorbed while decoding WAL. - */ - List *schemaPubids = GetSchemaPublications(schemaId); - ListCell *lc; - Oid publish_as_relid = relid; - int publish_ancestor_level = 0; - bool am_partition = get_rel_relispartition(relid); - char relkind = get_rel_relkind(relid); List *rel_publications = NIL; /* Reload publications if needed before use. */ @@ -2063,123 +2049,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) * but here we only need to consider ones that the subscriber * requested. */ - foreach(lc, data->publications) - { - Publication *pub = lfirst(lc); - bool publish = false; - - /* - * Under what relid should we publish changes in this publication? - * We'll use the top-most relid across all publications. Also - * track the ancestor level for this publication. - */ - Oid pub_relid = relid; - int ancestor_level = 0; - - /* - * If this is a FOR ALL TABLES publication, pick the partition - * root and set the ancestor level accordingly. - */ - if (pub->alltables) - { - publish = true; - if (pub->pubviaroot && am_partition) - { - List *ancestors = get_partition_ancestors(relid); - - pub_relid = llast_oid(ancestors); - ancestor_level = list_length(ancestors); - } - } - - if (!publish) - { - bool ancestor_published = false; - - /* - * For a partition, check if any of the ancestors are - * published. If so, note down the topmost ancestor that is - * published via this publication, which will be used as the - * relation via which to publish the partition's changes. - */ - if (am_partition) - { - Oid ancestor; - int level; - List *ancestors = get_partition_ancestors(relid); - - ancestor = GetTopMostAncestorInPublication(pub->oid, - ancestors, - &level); - - if (ancestor != InvalidOid) - { - ancestor_published = true; - if (pub->pubviaroot) - { - pub_relid = ancestor; - ancestor_level = level; - } - } - } - - if (list_member_oid(pubids, pub->oid) || - list_member_oid(schemaPubids, pub->oid) || - ancestor_published) - publish = true; - } - - /* - * If the relation is to be published, determine actions to - * publish, and list of columns, if appropriate. - * - * Don't publish changes for partitioned tables, because - * publishing those of its partitions suffices, unless partition - * changes won't be published due to pubviaroot being set. - */ - if (publish && - (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) - { - entry->pubactions.pubinsert |= pub->pubactions.pubinsert; - entry->pubactions.pubupdate |= pub->pubactions.pubupdate; - entry->pubactions.pubdelete |= pub->pubactions.pubdelete; - entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; - - /* - * We want to publish the changes as the top-most ancestor - * across all publications. So we need to check if the already - * calculated level is higher than the new one. If yes, we can - * ignore the new value (as it's a child). Otherwise the new - * value is an ancestor, so we keep it. - */ - if (publish_ancestor_level > ancestor_level) - continue; - - /* - * If we found an ancestor higher up in the tree, discard the - * list of publications through which we replicate it, and use - * the new ancestor. - */ - if (publish_ancestor_level < ancestor_level) - { - publish_as_relid = pub_relid; - publish_ancestor_level = ancestor_level; - - /* reset the publication list for this relation */ - rel_publications = NIL; - } - else - { - /* Same ancestor level, has to be the same OID. */ - Assert(publish_as_relid == pub_relid); - } - - /* Track publications for this ancestor. */ - rel_publications = lappend(rel_publications, pub); - } - } - - entry->publish_as_relid = publish_as_relid; + rel_publications = + process_relation_publications(relid, data->publications, + &entry->pubactions, + &entry->publish_as_relid); /* * Initialize the tuple slot, map, and row filter. These are only used @@ -2198,8 +2071,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) pgoutput_column_list_init(data, rel_publications, entry); } - list_free(pubids); - list_free(schemaPubids); list_free(rel_publications); entry->replicate_valid = true; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..0598af0cf9 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11852,6 +11852,14 @@ proname => 'pg_relation_is_publishable', provolatile => 's', prorettype => 'bool', proargtypes => 'regclass', prosrc => 'pg_relation_is_publishable' }, +{ oid => '8139', + descr => 'get information on how a relation will be published via a list of publications', + proname => 'pg_get_relation_publishing_info', provariadic => 'text', + provolatile => 's', prorettype => 'record', proargtypes => 'regclass _text', + proallargtypes => '{regclass,_text,_oid,oid,bool,bool,bool,bool}', + proargmodes => '{i,v,o,o,o,o,o,o}', + proargnames => '{relid,pubnames,pubids,pubasrelid,pubinsert,pubupdate,pubdelete,pubtruncate}', + prosrc => 'pg_get_relation_publishing_info' }, # rls { oid => '3298', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 6ecaa2a01e..d4e3488917 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -155,4 +155,8 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt); +extern List *process_relation_publications(Oid relid, const List *publications, + PublicationActions *pubactions, + Oid *publish_as_relid); + #endif /* PG_PUBLICATION_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 16361a91f9..85c217dadb 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -5,6 +5,17 @@ CREATE ROLE regress_publication_user LOGIN SUPERUSER; CREATE ROLE regress_publication_user2; CREATE ROLE regress_publication_user_dummy LOGIN NOSUPERUSER; SET SESSION AUTHORIZATION 'regress_publication_user'; +CREATE FUNCTION published_stream (VARIADIC pubnames text[]) + RETURNS TABLE (pubname text, published regclass, synced regclass) AS $$ + -- For each publication, show each published root alongside the tables which + -- are published via its OID. + SELECT p.pubname, rpi.pubasrelid::regclass, pr.prrelid::regclass + FROM pg_publication_rel pr + JOIN pg_publication p ON (p.oid = pr.prpubid), + pg_get_relation_publishing_info(pr.prrelid, VARIADIC pubnames) rpi + WHERE p.pubname = ANY (pubnames) + ORDER BY p.oid, 2, 3; +$$ LANGUAGE sql; -- suppress warning that depends on wal_level SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_default; @@ -1692,6 +1703,13 @@ SELECT * FROM pg_publication_tables; pub | sch1 | tbl1 | {a} | (1 row) +SELECT * FROM published_stream('pub'); + pubname | published | synced +---------+-----------+----------------- + pub | sch1.tbl1 | sch1.tbl1 + pub | sch1.tbl1 | sch2.tbl1_part1 +(2 rows) + DROP PUBLICATION pub; -- Schema publication that does not include the schema that has the parent table CREATE PUBLICATION pub FOR TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0); @@ -1718,6 +1736,13 @@ SELECT * FROM pg_publication_tables; pub | sch2 | tbl1_part1 | {a} | (1 row) +SELECT * FROM published_stream('pub'); + pubname | published | synced +---------+-----------------+----------------- + pub | sch1.tbl1 | sch1.tbl1 + pub | sch2.tbl1_part1 | sch2.tbl1_part1 +(2 rows) + DROP PUBLICATION pub; DROP TABLE sch2.tbl1_part1; DROP TABLE sch1.tbl1; @@ -1738,6 +1763,7 @@ DROP PUBLICATION pub; DROP TABLE sch1.tbl1; DROP SCHEMA sch1 cascade; DROP SCHEMA sch2 cascade; +DROP FUNCTION published_stream; RESET SESSION AUTHORIZATION; DROP ROLE regress_publication_user, regress_publication_user2; DROP ROLE regress_publication_user_dummy; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index d5051a5e74..6bf9554d48 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -6,6 +6,18 @@ CREATE ROLE regress_publication_user2; CREATE ROLE regress_publication_user_dummy LOGIN NOSUPERUSER; SET SESSION AUTHORIZATION 'regress_publication_user'; +CREATE FUNCTION published_stream (VARIADIC pubnames text[]) + RETURNS TABLE (pubname text, published regclass, synced regclass) AS $$ + -- For each publication, show each published root alongside the tables which + -- are published via its OID. + SELECT p.pubname, rpi.pubasrelid::regclass, pr.prrelid::regclass + FROM pg_publication_rel pr + JOIN pg_publication p ON (p.oid = pr.prpubid), + pg_get_relation_publishing_info(pr.prrelid, VARIADIC pubnames) rpi + WHERE p.pubname = ANY (pubnames) + ORDER BY p.oid, 2, 3; +$$ LANGUAGE sql; + -- suppress warning that depends on wal_level SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_default; @@ -1064,6 +1076,7 @@ SELECT * FROM pg_publication_tables; -- Table publication that includes both the parent table and the child table ALTER PUBLICATION pub ADD TABLE sch1.tbl1; SELECT * FROM pg_publication_tables; +SELECT * FROM published_stream('pub'); DROP PUBLICATION pub; -- Schema publication that does not include the schema that has the parent table @@ -1078,6 +1091,7 @@ SELECT * FROM pg_publication_tables; -- Table publication that includes both the parent table and the child table ALTER PUBLICATION pub ADD TABLE sch1.tbl1; SELECT * FROM pg_publication_tables; +SELECT * FROM published_stream('pub'); DROP PUBLICATION pub; DROP TABLE sch2.tbl1_part1; @@ -1096,6 +1110,7 @@ DROP PUBLICATION pub; DROP TABLE sch1.tbl1; DROP SCHEMA sch1 cascade; DROP SCHEMA sch2 cascade; +DROP FUNCTION published_stream; RESET SESSION AUTHORIZATION; DROP ROLE regress_publication_user, regress_publication_user2; -- 2.34.1