From 62ab3cfa9036812384776795fa4da759e0d6c383 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Thu, 26 Aug 2021 19:12:16 +1000 Subject: [PATCH v25] ExprState cache modifications. Now the cached Node/ExprState lists are invalidated only in rel_sync_cache_relation_cb function, so the ALTER PUBLICATION for one table should not cause row-filters of other tables to also become invalidated. Now the ExprState list evaluation is deferred until first needed in pgouput_row_filter function. Changes are based on a suggestion from Amit [1]. [1] https://www.postgresql.org/message-id/CAA4eK1%2BxQb06NGs6Y7OzwMtKYYixEqR8tdWV5THAVE4SAqNrDg%40mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 89 ++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ef1ba91..bb6f608 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -123,6 +123,7 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; + List *rfnode_list; /* Row filters */ List *exprstate; /* ExprState for row filter */ TupleTableSlot *scantuple; /* tuple table slot for row filter */ @@ -737,14 +738,15 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R ExprContext *ecxt; ListCell *lc; bool result = true; + Oid relid = RelationGetRelid(relation); /* Bail out if there is no row filter */ - if (entry->exprstate == NIL) + if (entry->rfnode_list == NIL) return true; elog(DEBUG3, "table \"%s.%s\" has row filter", - get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), - get_rel_name(relation->rd_id)); + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)); PushActiveSnapshot(GetTransactionSnapshot()); @@ -757,6 +759,28 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); /* + * If the ExprState cache list is NIL then it means it is not yet built for this + * relation, so build it on-the-fly now. + */ + if (entry->exprstate == NIL) + { + MemoryContext oldctx; + + foreach(lc, entry->rfnode_list) + { + Node *rfnode = (Node *) lfirst(lc); + ExprState *exprstate; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + exprstate = pgoutput_row_filter_init_expr(rfnode); + entry->exprstate = lappend(entry->exprstate, exprstate); + MemoryContextSwitchTo(oldctx); + } + + Assert(entry->exprstate != NIL); + } + + /* * If the subscription has multiple publications and the same table has a * different row filter in these publications, all row filters must be * matched in order to replicate this change. @@ -1321,6 +1345,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; + entry->rfnode_list = NIL; entry->exprstate = NIL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if @@ -1435,31 +1460,32 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* - * Cache row filter, if available. All publication-table mappings - * must be checked. If it is a partition and pubviaroot is true, - * use the row filter of the topmost partitioned table instead of - * the row filter of its own partition. + * Cache row filter, if not already done and if available. + * + * All publication-table mappings must be checked. If it is a + * partition and pubviaroot is true, use the row filter of the + * topmost partitioned table instead of the row filter of its own + * partition. */ - rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); - if (HeapTupleIsValid(rftuple)) + if (entry->rfnode_list == NIL) { - rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); - - if (!rfisnull) + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) { - Node *rfnode; - ExprState *exprstate; + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - rfnode = stringToNode(TextDatumGetCString(rfdatum)); + if (!rfisnull) + { + Node *rfnode; - /* Prepare for expression execution */ - exprstate = pgoutput_row_filter_init_expr(rfnode); - entry->exprstate = lappend(entry->exprstate, exprstate); - MemoryContextSwitchTo(oldctx); - } + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + entry->rfnode_list = lappend(entry->rfnode_list, rfnode); + MemoryContextSwitchTo(oldctx); + } - ReleaseSysCache(rftuple); + ReleaseSysCache(rftuple); + } } } @@ -1567,6 +1593,21 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) free_conversion_map(entry->map); } entry->map = NULL; + + /* + * Free the row filter caches. They will be rebuilt later if needed. + */ + if (entry->rfnode_list != NIL) + { + list_free_deep(entry->rfnode_list); + entry->rfnode_list = NIL; + entry->replicate_valid = false; /* Clear flag so get_rel_syn_entry with rebuild cache. */ + } + if (entry->exprstate != NIL) + { + list_free_deep(entry->exprstate); + entry->exprstate = NIL; + } } } @@ -1607,10 +1648,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - - if (entry->exprstate != NIL) - list_free_deep(entry->exprstate); - entry->exprstate = NIL; } MemoryContextSwitchTo(oldctx); -- 1.8.3.1