From 7c8e8a9c89b9fb7c10dc3a3ec560e3c21c0c1a8f Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 22 Nov 2021 20:26:50 -0500 Subject: [PATCH v41 3/6] PS - ExprState cache modifications. Now the cached row-filters (e.g. ExprState *) are invalidated only in rel_sync_cache_relation_cb function, so it means the ALTER PUBLICATION for one table should not cause row-filters of other tables to also become invalidated. Also all code related to caching row-filters has been removed from the get_rel_sync_entry function and is now done just before they are needed in the pgoutput_row_filter function. If there are multiple publication filters for a given table these are are all combined into a single filter. Author: Peter Smith, Greg Nancarrow Changes are based on a suggestions from Amit [1] [2], and Houz [3] [1] https://www.postgresql.org/message-id/CAA4eK1%2BxQb06NGs6Y7OzwMtKYYixEqR8tdWV5THAVE4SAqNrDg%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAA4eK1%2Btio46goUKBUfAKFsFVxtgk8nOty%3DTxKoKH-gdLzHD2g%40mail.gmail.com [3] https://www.postgresql.org/message-id/OS0PR01MB5716090A70A73ADF58C58950948D9%40OS0PR01MB5716.jpnprd01.prod.outlook.com --- src/backend/replication/pgoutput/pgoutput.c | 214 ++++++++++++++++++---------- 1 file changed, 139 insertions(+), 75 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3643684..fd024d4 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +/*------------------------------------------------------------------------ * * pgoutput.c * Logical Replication output plugin @@ -21,6 +21,7 @@ #include "executor/executor.h" #include "fmgr.h" #include "nodes/nodeFuncs.h" +#include "nodes/makefuncs.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "replication/logical.h" @@ -123,7 +124,16 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; - List *exprstate; /* ExprState for row filter */ + + /* + * Row-filter related members: + * The flag 'rowfilter_valid' indicates if the exprstate has been assigned + * yet or not. We cannot just use the exprstate value for this purpose + * because there might be no filter at all for the current relid (e.g. + * exprstate is NULL). + */ + bool rowfilter_valid; + ExprState *exprstate; /* ExprState for row filter(s) */ TupleTableSlot *scantuple; /* tuple table slot for row filter */ /* @@ -161,7 +171,7 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, static EState *create_estate_for_relation(Relation rel); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, +static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry); /* @@ -731,20 +741,118 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) * If it returns true, the change is replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) { EState *estate; ExprContext *ecxt; ListCell *lc; bool result = true; + Oid relid = RelationGetRelid(relation); + List *rfnodes = NIL; + int n_filters; + + /* + * If the row filter caching is currently flagged "invalid" then it means we + * don't know yet if there is/isn't any row filters for this relation. + * + * This code is usually one-time execution. + * + * NOTE: The ExprState cache could have been created up-front in the + * function get_rel_sync_entry() instead of the deferred on-the-fly + * assignment below. The reason for choosing to do it here is because there + * are some scenarios where the get_rel_sync_entry() is called but where a + * row will not be published. For example, for truncate, we may not need + * any row evaluation, so there is no need to compute it. It would also be + * a waste if any error happens before actually evaluating the filter. And + * tomorrow there could be other operations (which use get_rel_sync_entry) + * but which don't need to build ExprState. So the decision was to defer + * this logic to last moment when we know it will be needed. + */ + if (!entry->rowfilter_valid) + { + MemoryContext oldctx; + TupleDesc tupdesc = RelationGetDescr(relation); + + /* + * Create a tuple table slot for row filter. TupleDesc must live as + * long as the cache remains. Release the tuple table slot if it + * already exists. + */ + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + tupdesc = CreateTupleDescCopy(tupdesc); + entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + MemoryContextSwitchTo(oldctx); + + /* + * Find if there are any row filters for this relation. If there are, + * then prepare the necessary ExprState and cache it in entry->exprstate. + * + * NOTE: All publication-table mappings must be checked. + * + * NOTE: If the relation is a partition and pubviaroot is true, use + * the row filter of the topmost partitioned table instead of the row + * filter of its own partition. + */ + foreach(lc, data->publications) + { + Publication *pub = lfirst(lc); + HeapTuple rftuple; + Datum rfdatum; + bool rfisnull; + + /* + * Lookup if there is a row-filter, and if yes remember it in a list. + * In code following this 'publications' loop we will combine all filters. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(entry->publish_as_relid), ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(rftuple)) + { + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); + + if (!rfisnull) + { + Node *rfnode; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + rfnodes = lappend(rfnodes, rfnode); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(rftuple); + } + + } /* loop all subscribed publications */ + + /* + * Combine all the row-filters (if any) into a single filter, and then build the ExprState for it + */ + n_filters = list_length(rfnodes); + if (n_filters > 0) + { + Node *rfnode; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rfnode = n_filters > 1 ? makeBoolExpr(AND_EXPR, rfnodes, -1) : linitial(rfnodes); + entry->exprstate = pgoutput_row_filter_init_expr(rfnode); + MemoryContextSwitchTo(oldctx); + } + + entry->rowfilter_valid = true; + } /* Bail out if there is no row filter */ - if (entry->exprstate == NIL) + if (!entry->exprstate) return true; elog(DEBUG3, "table \"%s.%s\" has row filter", - get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), - get_rel_name(relation->rd_id)); + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)); PushActiveSnapshot(GetTransactionSnapshot()); @@ -757,20 +865,13 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); /* - * If the subscription has multiple publications and the same table has a - * different row filter in these publications, all row filters must be - * matched in order to replicate this change. + * NOTE: Multiple publication row-filters have already been combined to a + * single exprstate. */ - foreach(lc, entry->exprstate) + if (entry->exprstate) { - ExprState *exprstate = (ExprState *) lfirst(lc); - /* Evaluates row filter */ - result = pgoutput_row_filter_exec_expr(exprstate, ecxt); - - /* If the tuple does not match one of the row filters, bail out */ - if (!result) - break; + result = pgoutput_row_filter_exec_expr(entry->exprstate, ecxt); } /* Cleanup allocated resources */ @@ -840,7 +941,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) break; /* @@ -873,7 +974,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple newtuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -907,7 +1008,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1321,10 +1422,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->replicate_valid = false; + entry->rowfilter_valid = false; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; - entry->exprstate = NIL; + entry->exprstate = NULL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1344,7 +1446,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; - TupleDesc tupdesc = RelationGetDescr(relation); /* Reload publications if needed before use. */ if (!publications_valid) @@ -1358,22 +1459,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } - /* Release tuple table slot */ - if (entry->scantuple != NULL) - { - ExecDropSingleTupleTableSlot(entry->scantuple); - entry->scantuple = NULL; - } - - /* - * Create a tuple table slot for row filter. TupleDesc must live as - * long as the cache remains. - */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - tupdesc = CreateTupleDescCopy(tupdesc); - entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); - MemoryContextSwitchTo(oldctx); - /* * Build publication cache. We can't use one provided by relcache as * relcache considers all publications given relation is in, but here @@ -1383,9 +1468,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Publication *pub = lfirst(lc); bool publish = false; - HeapTuple rftuple; - Datum rfdatum; - bool rfisnull; if (pub->alltables) { @@ -1449,33 +1531,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - /* - * Cache row filter, if available. All publication-table mappings - * must be checked. If it is a partition and pubviaroot is true, - * use the row filter of the topmost partitioned table instead of - * the row filter of its own partition. - */ - rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid)); - if (HeapTupleIsValid(rftuple)) - { - rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull); - - if (!rfisnull) - { - Node *rfnode; - ExprState *exprstate; - - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - rfnode = stringToNode(TextDatumGetCString(rfdatum)); - - /* Prepare for expression execution */ - exprstate = pgoutput_row_filter_init_expr(rfnode); - entry->exprstate = lappend(entry->exprstate, exprstate); - MemoryContextSwitchTo(oldctx); - } - - ReleaseSysCache(rftuple); - } } list_free(pubids); @@ -1582,6 +1637,21 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) free_conversion_map(entry->map); } entry->map = NULL; + + /* + * Row filter cache cleanups. (Will be rebuilt later if needed). + */ + entry->rowfilter_valid = false; + if (entry->scantuple != NULL) + { + ExecDropSingleTupleTableSlot(entry->scantuple); + entry->scantuple = NULL; + } + if (entry->exprstate != NULL) + { + pfree(entry->exprstate); + entry->exprstate = NULL; + } } } @@ -1622,12 +1692,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - - if (entry->exprstate != NIL) - { - list_free_deep(entry->exprstate); - entry->exprstate = NIL; - } } MemoryContextSwitchTo(oldctx); -- 1.8.3.1