From 002ce81dbae3df85610f65a235b45d046af0830f Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Fri, 2 Jul 2021 16:54:45 +1000 Subject: [PATCH v17 4/5] PS POC - Implement a plan cache for pgoutput. This is a POC patch to implement plan cache which gets used inside the pgoutput_row_filter function instead of calling prepare for every row. This is intended to implement a cache like what Andes was suggesting [1] to see what difference it makes. Use #if 0/1 to toggle wihout/with caching. [1] https://www.postgresql.org/message-id/20210128022032.eq2qqc6zxkqn5syt%40alap3.anarazel.de --- src/backend/replication/pgoutput/pgoutput.c | 90 +++++++++++++++++++-- 1 file changed, 82 insertions(+), 8 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 10f85365fc..86aa012505 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -35,6 +35,7 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/varlena.h" +#include "optimizer/optimizer.h" PG_MODULE_MAGIC; @@ -72,8 +73,6 @@ static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, static EState *create_estate_for_relation(Relation rel); static ExprState *pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, - HeapTuple newtuple, List *rowfilter); static bool publications_valid; static bool in_streaming; @@ -113,6 +112,7 @@ typedef struct RelationSyncEntry bool replicate_valid; PublicationActions pubactions; List *qual; + List *exprstate_list; /* * OID of the relation to publish changes as. For a partition, this may @@ -144,6 +144,8 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, RelationSyncEntry *entry); /* * Specify output plugin callbacks @@ -578,6 +580,35 @@ pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate) return exprstate; } +static ExprState * +pgoutput_row_filter_prepare_expr2(Node *rfnode) +{ + ExprState *exprstate; + Oid exprtype; + Expr *expr; + MemoryContext oldctx; + + /* Prepare expression for execution */ + exprtype = exprType(rfnode); + expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1); + + if (expr == NULL) + ereport(ERROR, + (errcode(ERRCODE_CANNOT_COERCE), + errmsg("row filter returns type %s that cannot be coerced to the expected type %s", + format_type_be(exprtype), + format_type_be(BOOLOID)), + errhint("You will need to rewrite the row filter."))); + + /* Make the exprstate long-lived by using CacheMemoryContext. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + expr = expression_planner(expr); + exprstate = ExecInitExpr(expr, NULL); + MemoryContextSwitchTo(oldctx); + + return exprstate; +} + /* * Evaluates row filter. * @@ -610,7 +641,7 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) * If it returns true, the change is replicated, otherwise, it is not. */ static bool -pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, List *rowfilter) +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) { TupleDesc tupdesc; EState *estate; @@ -618,11 +649,20 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L MemoryContext oldcxt; ListCell *lc; bool result = true; +//#define RF_TIMES +#ifdef RF_TIMES + instr_time start_time; + instr_time end_time; +#endif /* Bail out if there is no row filter */ - if (rowfilter == NIL) + if (entry->qual == NIL) return true; +#ifdef RF_TIMES + INSTR_TIME_SET_CURRENT(start_time); +#endif + elog(DEBUG3, "table \"%s.%s\" has row filter", get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), get_rel_name(relation->rd_id)); @@ -646,7 +686,9 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L * different row filter in these publications, all row filters must be * matched in order to replicate this change. */ - foreach(lc, rowfilter) +#if 0 + /* Don't use cached plan. */ + foreach(lc, entry->qual) { Node *rfnode = (Node *) lfirst(lc); ExprState *exprstate; @@ -667,12 +709,34 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, L if (!result) break; } +#else + /* Use cached plan. */ + foreach(lc, entry->exprstate_list) + { + ExprState *exprstate = (ExprState *) lfirst(lc); + + /* Evaluates row filter */ + result = pgoutput_row_filter_exec_expr(exprstate, ecxt); + + elog(DEBUG3, "row filter %smatched", result ? "" : " not"); + + /* If the tuple does not match one of the row filters, bail out */ + if (!result) + break; + } +#endif /* Cleanup allocated resources */ ResetExprContext(ecxt); FreeExecutorState(estate); PopActiveSnapshot(); +#ifdef RF_TIMES + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + elog(LOG, "row filter time: %0.3f us", INSTR_TIME_GET_DOUBLE(end_time) * 1e6); +#endif + return result; } @@ -735,7 +799,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, NULL, tuple, relentry->qual)) + if (!pgoutput_row_filter(relation, NULL, tuple, relentry)) return; /* @@ -768,7 +832,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple newtuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry->qual)) + if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry)) return; maybe_send_schema(ctx, txn, change, relation, relentry); @@ -802,7 +866,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry->qual)) + if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry)) return; maybe_send_schema(ctx, txn, change, relation, relentry); @@ -1211,6 +1275,7 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->qual = NIL; + entry->exprstate_list = NIL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ @@ -1320,10 +1385,16 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) if (!rfisnull) { Node *rfnode; + ExprState *exprstate; oldctx = MemoryContextSwitchTo(CacheMemoryContext); rfnode = stringToNode(TextDatumGetCString(rfdatum)); entry->qual = lappend(entry->qual, rfnode); + + /* Cache the planned row filter */ + exprstate = pgoutput_row_filter_prepare_expr2(rfnode); + entry->exprstate_list = lappend(entry->exprstate_list, exprstate); + MemoryContextSwitchTo(oldctx); } @@ -1479,6 +1550,9 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) if (entry->qual != NIL) list_free_deep(entry->qual); entry->qual = NIL; + + /* FIXME - something to be freed here? */ + entry->exprstate_list = NIL; } MemoryContextSwitchTo(oldctx); -- 2.27.0