From 1c52e41cc8821b2034824e72516bccd669a75ef2 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 8 Dec 2021 05:32:44 -0500 Subject: [PATCH v45 3/5] Support updates based on old and new tuple in row filters. When applying row filter on updates, check both old_tuple and new_tuple to decide how an update needs to be transformed. UPDATE old-row (match) new-row (no match) -> DELETE old-row (no match) new row (match) -> INSERT old-row (match) new row (match) -> UPDATE old-row (no match) new-row (no match) -> (drop change) Also tuples that have been deformed will be cached in slots to avoid multiple deforming of tuples. Author: Ajin Cherian --- src/backend/replication/logical/proto.c | 38 ++++-- src/backend/replication/pgoutput/pgoutput.c | 194 +++++++++++++++++++++++++--- src/include/replication/logicalproto.h | 7 +- src/include/replication/reorderbuffer.h | 6 +- src/test/subscription/t/027_row_filter.pl | 4 +- src/tools/pgindent/typedefs.list | 1 + 6 files changed, 212 insertions(+), 38 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b..110ccff 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -19,6 +19,7 @@ #include "replication/logicalproto.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "executor/executor.h" /* * Protocol message flags. @@ -31,8 +32,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); - + HeapTuple tuple, TupleTableSlot *slot, + bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -410,7 +411,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, NULL, binary); } /* @@ -442,7 +443,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +465,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, oldslot, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, newslot, binary); } /* @@ -536,7 +538,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, NULL, binary); } /* @@ -749,13 +751,16 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, TupleTableSlot *slot, +bool binary) { TupleDesc desc; - Datum values[MaxTupleAttributeNumber]; - bool isnull[MaxTupleAttributeNumber]; + Datum *values; + bool *isnull; int i; uint16 nliveatts = 0; + Datum attr_values[MaxTupleAttributeNumber]; + bool attr_isnull[MaxTupleAttributeNumber]; desc = RelationGetDescr(rel); @@ -771,7 +776,17 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar enlargeStringInfo(out, tuple->t_len + nliveatts * (1 + 4)); - heap_deform_tuple(tuple, desc, values, isnull); + if (TupIsNull(slot)) + { + values = attr_values; + isnull = attr_isnull; + heap_deform_tuple(tuple, desc, values, isnull); + } + else + { + values = slot->tts_values; + isnull = slot->tts_isnull; + } /* Write the values */ for (i = 0; i < desc->natts; i++) @@ -832,6 +847,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar ReleaseSysCache(typtup); } + } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3b85915..0ccffa7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/tupconvert.h" +#include "access/xact.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" @@ -25,6 +26,7 @@ #include "parser/parse_coerce.h" #include "replication/logical.h" #include "replication/logicalproto.h" +#include "replication/logicalrelation.h" #include "replication/origin.h" #include "replication/pgoutput.h" #include "utils/builtins.h" @@ -132,7 +134,10 @@ typedef struct RelationSyncEntry */ bool rowfilter_valid; ExprState *exprstate; /* ExprState for row filter(s) */ - TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *scantuple; /* tuple table slot for row filter */ + TupleTableSlot *new_tuple; /* slot for storing deformed new tuple during updates */ + TupleTableSlot *old_tuple; /* slot for storing deformed old tuple during updates */ + TupleTableSlot *tmp_new_tuple; /* slot for temporary new tuple used for expression evaluation */ /* * OID of the relation to publish changes as. For a partition, this may @@ -167,10 +172,15 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, /* row filter routines */ static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry); static ExprState *pgoutput_row_filter_init_expr(Node *rfnode); static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext); -static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, - HeapTuple newtuple, RelationSyncEntry *entry); +static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, TupleTableSlot *slot, + RelationSyncEntry *entry); +static bool pgoutput_row_filter_update_check(Relation relation, HeapTuple oldtuple, + HeapTuple newtuple, RelationSyncEntry *entry, + ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -734,18 +744,112 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) } /* - * Change is checked against the row filter, if any. - * - * If it returns true, the change is replicated, otherwise, it is not. + * Update is checked against the row filter, if any. + * Updates are transformed to inserts and deletes based on the + * old_tuple and new_tuple. The new action is updated in the + * action parameter. If not updated, action remains as update. + * old-row (no match) new-row (no match) -> (drop change) + * old-row (no match) new row (match) -> INSERT + * old-row (match) new-row (no match) -> DELETE + * old-row (match) new row (match) -> UPDATE + * If the change is to be replicated returns true, else false. */ static bool -pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry) +pgoutput_row_filter_update_check(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry, ReorderBufferChangeType *action) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; + bool old_matched, new_matched; + TupleTableSlot *tmp_new_slot, *old_slot, *new_slot; + + /* Bail out if there is no row filter */ + if (!entry->exprstate) + return true; + + /* update requires a new tuple */ + Assert(newtuple); + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), + get_rel_name(relation->rd_id)); + + /* + * If no old_tuple, then none of the replica identity columns changed + * and this would reduce to a simple update. + */ + if (!oldtuple) + { + *action = REORDER_BUFFER_CHANGE_UPDATE; + return pgoutput_row_filter(relation, NULL, newtuple, NULL, entry); + } + + old_slot = entry->old_tuple; + new_slot = entry->new_tuple; + tmp_new_slot = entry->tmp_new_tuple; + ExecClearTuple(old_slot); + ExecClearTuple(new_slot); + ExecClearTuple(tmp_new_slot); + + heap_deform_tuple(newtuple, desc, new_slot->tts_values, new_slot->tts_isnull); + heap_deform_tuple(oldtuple, desc, old_slot->tts_values, old_slot->tts_isnull); + + ExecStoreVirtualTuple(old_slot); + ExecStoreVirtualTuple(new_slot); + + tmp_new_slot = ExecCopySlot(tmp_new_slot, new_slot); + + /* + * For updates, both the newtuple and oldtuple needs to be checked + * against the row-filter. The newtuple might not have all the + * replica identity columns, in which case it needs to be + * copied over from the oldtuple. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* if the column in the new_tuple is null, nothing to do */ + if (tmp_new_slot->tts_isnull[i]) + continue; + + /* + * Unchanged toasted replica identity columns are + * only detoasted in the old tuple, copy this over to the newtuple. + */ + if ((att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(tmp_new_slot->tts_values[i])) && + (!old_slot->tts_isnull[i] && + !(VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])))) + { + Assert(!old_slot->tts_isnull[i] && + !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])); + tmp_new_slot->tts_values[i] = old_slot->tts_values[i]; + } + + } + + old_matched = pgoutput_row_filter(relation, NULL, NULL, old_slot, entry); + new_matched = pgoutput_row_filter(relation, NULL, NULL, tmp_new_slot, entry); + + if (!old_matched && !new_matched) + return false; + + if (!old_matched && new_matched) + *action = REORDER_BUFFER_CHANGE_INSERT; + else if (old_matched && !new_matched) + *action = REORDER_BUFFER_CHANGE_DELETE; + else if (new_matched && old_matched) + *action = REORDER_BUFFER_CHANGE_UPDATE; + + return true; +} + +/* + * Initialize the row filter, the first time. + */ +static void +pgoutput_row_filter_init(PGOutputData *data, Relation relation, RelationSyncEntry *entry) { - EState *estate; - ExprContext *ecxt; ListCell *lc; - bool result = true; - Oid relid = RelationGetRelid(relation); List *rfnodes = NIL; int n_filters; @@ -857,16 +961,34 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H entry->exprstate = pgoutput_row_filter_init_expr(rfnode); /* - * Create a tuple table slot for row filter. TupleDesc must live as - * long as the cache remains. + * Create tuple table slots for row filter. Create a copy of the + * TupleDesc as it needs to live as long as the cache remains. */ tupdesc = CreateTupleDescCopy(tupdesc); entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple); + entry->old_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->new_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + entry->tmp_new_tuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); MemoryContextSwitchTo(oldctx); } entry->rowfilter_valid = true; } +} + +/* + * Change is checked against the row filter, if any. + * + * If it returns true, the change is replicated, otherwise, it is not. + */ +static bool +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, TupleTableSlot *slot, +RelationSyncEntry *entry) +{ + EState *estate; + ExprContext *ecxt; + bool result = true; + Oid relid = RelationGetRelid(relation); /* Bail out if there is no row filter */ if (!entry->exprstate) @@ -885,7 +1007,12 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H ecxt = GetPerTupleExprContext(estate); ecxt->ecxt_scantuple = entry->scantuple; - ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); + if (newtuple || oldtuple) + ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false); + else + { + ecxt->ecxt_scantuple = slot; + } /* * NOTE: Multiple publication row-filters have already been combined to a @@ -898,7 +1025,6 @@ pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, H } /* Cleanup allocated resources */ - ResetExprContext(ecxt); FreeExecutorState(estate); PopActiveSnapshot(); @@ -956,6 +1082,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); + /* Initialize the row_filter */ + pgoutput_row_filter_init(data, relation, relentry); + /* Send the data */ switch (change->action) { @@ -964,7 +1093,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple tuple = &change->data.tp.newtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry)) + if (!pgoutput_row_filter(relation, NULL, tuple, NULL, relentry)) break; /* @@ -995,9 +1124,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; HeapTuple newtuple = &change->data.tp.newtuple->tuple; + ReorderBufferChangeType modified_action = REORDER_BUFFER_CHANGE_UPDATE; - /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry)) + if (!pgoutput_row_filter_update_check(relation, oldtuple, newtuple, relentry, + &modified_action)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1020,8 +1150,27 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + + switch (modified_action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, relation, newtuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + logicalrep_write_update(ctx->out, xid, relation, + oldtuple, newtuple, relentry->old_tuple, + relentry->new_tuple, + data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, + data->binary); + break; + default: + Assert(false); + } + OutputPluginWrite(ctx, true); break; } @@ -1031,7 +1180,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; /* Check row filter. */ - if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry)) + if (!pgoutput_row_filter(relation, oldtuple, NULL, NULL, relentry)) break; maybe_send_schema(ctx, change, relation, relentry); @@ -1449,6 +1598,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->scantuple = NULL; + entry->new_tuple = NULL; + entry->old_tuple = NULL; + entry->tmp_new_tuple = NULL; entry->exprstate = NULL; entry->publish_as_relid = InvalidOid; entry->map = NULL; /* will be set by maybe_send_schema() if diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dc..427c40a 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -16,6 +16,7 @@ #include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" +#include "executor/executor.h" /* * Protocol capabilities @@ -211,7 +212,11 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid, extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary); +extern void logicalrep_write_update_cached(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldtuple, TupleTableSlot *newtuple, + bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5b40ff7..aec0059 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of * logical decoding don't have to care about these. */ -enum ReorderBufferChangeType +typedef enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, @@ -65,7 +65,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE -}; +} ReorderBufferChangeType; /* forward declaration */ struct ReorderBufferTXN; @@ -82,7 +82,7 @@ typedef struct ReorderBufferChange XLogRecPtr lsn; /* The type of change. */ - enum ReorderBufferChangeType action; + ReorderBufferChangeType action; /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; diff --git a/src/test/subscription/t/027_row_filter.pl b/src/test/subscription/t/027_row_filter.pl index de6b73d..a2f25f6 100644 --- a/src/test/subscription/t/027_row_filter.pl +++ b/src/test/subscription/t/027_row_filter.pl @@ -277,7 +277,8 @@ is($result, qq(13|0|12), 'check replicated rows to tab_rowfilter_4'); # # - 1001, 1002, 1980 already exist from initial data copy # - INSERT (800, 'test 800') NO, because 800 is not > 1000 -# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered' +# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered', +# but row deleted after the update below. # - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered' # - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered' # - UPDATE (1600, NULL) NO, row filter evaluates to false because NULL is not <> 'filtered' @@ -289,7 +290,6 @@ $result = "SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2"); is($result, qq(1001|test 1001 1002|test 1002 -1600|test 1600 1601|test 1601 updated 1980|not filtered), 'check replicated rows to table tab_rowfilter_1'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 575969c..e8dc5ad 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2198,6 +2198,7 @@ ReorderBufferApplyChangeCB ReorderBufferApplyTruncateCB ReorderBufferBeginCB ReorderBufferChange +ReorderBufferChangeType ReorderBufferCommitCB ReorderBufferCommitPreparedCB ReorderBufferDiskChange -- 1.8.3.1