From bbea43071179d26b9b0d069f942c73cf47673772 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 3 Nov 2021 06:57:08 -0400 Subject: [PATCH v36 3/6] PS - ExprState cache modifications. Now the cached row-filter caches (e.g. ExprState list) are invalidated only in rel_sync_cache_relation_cb function, so it means the ALTER PUBLICATION for one table should not cause row-filters of other tables to also become invalidated. Also all code related to caching row-filters has been removed from the get_rel_sync_entry function and is now done just before they are needed in the pgoutput_row_filter function. Changes are based on a suggestions from Amit [1] [2]. [1] https://www.postgresql.org/message-id/CAA4eK1%2BxQb06NGs6Y7OzwMtKYYixEqR8tdWV5THAVE4SAqNrDg%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAA4eK1%2Btio46goUKBUfAKFsFVxtgk8nOty%3DTxKoKH-gdLzHD2g%40mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 200 +++++++++++++++++++--------- 1 file changed, 136 insertions(+), 64 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 077ae18..3dfac7d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -123,7 +123,15 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; - List *exprstate; /* ExprState for row filter */ + + /* + * Row-filter related members: + * The flag 'rowfilter_valid' only means the exprstates list is correct - + * It doesn't mean that there actual is any row filter present for the + * current relid. + */ + bool rowfilter_valid; + List *exprstates; /* ExprState for row filter(s) */ TupleTableSlot *scantuple; /* tuple table slot for row filter */ /* @@ -161,7 +169,7 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, static EState *create_estate_for_relation(Relation rel); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); /* @@ -731,20 +739,121 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) * If it returns true, the change is replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) { EState *estate; ExprContext *ecxt; ListCell *lc; bool result = true; + Oid relid = RelationGetRelid(relation); + + /* + * If the row filter caching is currently flagged "invalid" then it means we + * don't know yet if there is/isn't any row filters for this relation. + * + * This code is usually one-time execution. + */ + if (!entry->rowfilter_valid) + { + bool am_partition = get_rel_relispartition(relid); + MemoryContext oldctx; + TupleDesc tupdesc = RelationGetDescr(relation); + + /* + * Create a tuple table slot for row filter. TupleDesc must live as + * long as the cache remains. Release the tuple table slot if it + * already exists. + */ + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + tupdesc = CreateTupleDescCopy(tupdesc); + entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + MemoryContextSwitchTo(oldctx); + + /* + * Find if there are any row filters for this relation. If there are, + * then prepare the necessary ExprState(s) and cache them in the + * entry->exprstates list. + * + * NOTE: All publication-table mappings must be checked. + * + * NOTE: If the relation is a partition and pubviaroot is true, use + * the row filter of the topmost partitioned table instead of the row + * filter of its own partition. + */ + foreach(lc, data->publications) + { + Publication *pub = lfirst(lc); + HeapTuple rftuple; + Datum rfdatum; + bool rfisnull; + Oid pub_relid = relid; + + if (pub->pubviaroot && am_partition) + { + if (pub->alltables) + pub_relid = llast_oid(get_partition_ancestors(relid)); + else + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc2; + + /* + * Find the "topmost" ancestor that is in this + * publication. + */ + foreach(lc2, ancestors) + { + Oid ancestor = lfirst_oid(lc2); + + if (list_member_oid(GetRelationPublications(ancestor), + pub->oid)) + { + pub_relid = ancestor; + } + } + } + } + + /* + * Lookup if there is a row-filter, and if so build the ExprState for it. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(pub_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) + { + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); + + if (!rfisnull) + { + Node *rfnode; + ExprState *exprstate; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + exprstate = pgoutput_row_filter_init_expr(rfnode); + entry->exprstates = lappend(entry->exprstates, exprstate); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(rftuple); + } + + } /* loop all subscribed publications */ + + entry->rowfilter_valid = true; + } /* Bail out if there is no row filter */ - if (entry->exprstate == NIL) + if (entry->exprstates == NIL) return true; elog(DEBUG3, "table \"%s.%s\" has row filter", - get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), - get_rel_name(relation->rd_id)); + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)); PushActiveSnapshot(GetTransactionSnapshot()); @@ -761,7 +870,7 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R * different row filter in these publications, all row filters must be * matched in order to replicate this change. */ - foreach(lc, entry->exprstate) + foreach(lc, entry->exprstates) { ExprState *exprstate = (ExprState *) lfirst(lc); @@ -840,7 +949,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) break; /* @@ -873,7 +982,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple newtuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -907,7 +1016,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1321,10 +1430,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->replicate_valid = false; + entry->rowfilter_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; - entry->exprstate = NIL; + entry->exprstates = NIL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1344,7 +1454,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; - TupleDesc tupdesc = RelationGetDescr(relation); /* Reload publications if needed before use. */ if (!publications_valid) @@ -1358,22 +1467,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } - /* Release tuple table slot */ - if (entry->scantuple != NULL) - { - ExecDropSingleTupleTableSlot(entry->scantuple); - entry->scantuple = NULL; - } - - /* - * Create a tuple table slot for row filter. TupleDesc must live as - * long as the cache remains. - */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - tupdesc = CreateTupleDescCopy(tupdesc); - entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); - MemoryContextSwitchTo(oldctx); - /* * Build publication cache. We can't use one provided by relcache as * relcache considers all publications given relation is in, but here @@ -1383,9 +1476,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Publication *pub = lfirst(lc); bool publish = false; - HeapTuple rftuple; - Datum rfdatum; - bool rfisnull; if (pub->alltables) { @@ -1449,33 +1539,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - /* - * Cache row filter, if available. All publication-table mappings - * must be checked. If it is a partition and pubviaroot is true, - * use the row filter of the topmost partitioned table instead of - * the row filter of its own partition. - */ - rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); - if (HeapTupleIsValid(rftuple)) - { - rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); - - if (!rfisnull) - { - Node *rfnode; - ExprState *exprstate; - - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - rfnode = stringToNode(TextDatumGetCString(rfdatum)); - - /* Prepare for expression execution */ - exprstate = pgoutput_row_filter_init_expr(rfnode); - entry->exprstate = lappend(entry->exprstate, exprstate); - MemoryContextSwitchTo(oldctx); - } - - ReleaseSysCache(rftuple); - } } list_free(pubids); @@ -1582,6 +1645,21 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) free_conversion_map(entry->map); } entry->map = NULL; + + /* + * Row filter cache cleanups. (Will be rebuilt later if needed). + */ + entry->rowfilter_valid = false; + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + if (entry->exprstates != NIL) + { + list_free_deep(entry->exprstates); + entry->exprstates = NIL; + } } } @@ -1622,12 +1700,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - - if (entry->exprstate != NIL) - { - list_free_deep(entry->exprstate); - entry->exprstate = NIL; - } } MemoryContextSwitchTo(oldctx); -- 1.8.3.1