From cf3d5097d2dce1c0dd5d0499f84e6d840de97176 Mon Sep 17 00:00:00 2001 From: Khanna Date: Tue, 16 Jul 2024 15:24:52 +0530 Subject: [PATCH v20 4/4] Improve include generated column option handling by using bms Improve include generated column option handling by using bms. --- src/backend/replication/logical/proto.c | 72 +++------------- src/backend/replication/pgoutput/pgoutput.c | 94 ++++++++++++++------- src/include/replication/logicalproto.h | 12 +-- src/test/subscription/t/031_column_list.pl | 2 +- 4 files changed, 80 insertions(+), 100 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index cad1b76e7a..6b085e555c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -30,12 +30,10 @@ #define TRUNCATE_RESTART_SEQS (1<<1) static void logicalrep_write_attrs(StringInfo out, Relation rel, - Bitmapset *columns, - bool include_generated_columns); + Bitmapset *columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns, - bool include_generated_columns); + bool binary, Bitmapset *columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -414,8 +412,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary, Bitmapset *columns, - bool include_generated_columns) + TupleTableSlot *newslot, bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -427,8 +424,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -461,8 +457,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns, - bool include_generated_columns) + bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -483,13 +478,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -539,7 +532,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, - Bitmapset *columns, bool include_generated_columns) + Bitmapset *columns) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -559,8 +552,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns); } /* @@ -676,7 +668,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, */ void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, - Bitmapset *columns, bool include_generated_columns) + Bitmapset *columns) { char *relname; @@ -698,7 +690,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel, columns, include_generated_columns); + logicalrep_write_attrs(out, rel, columns); } /* @@ -775,8 +767,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns, - bool include_generated_columns) + bool binary, Bitmapset *columns) { TupleDesc desc; Datum *values; @@ -793,15 +784,6 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, if (att->attisdropped) continue; - if (att->attgenerated) - { - if (!include_generated_columns) - continue; - - if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) - continue; - } - if (!column_in_column_list(att->attnum, columns)) continue; @@ -823,15 +805,6 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, if (att->attisdropped) continue; - if (att->attgenerated) - { - if (!include_generated_columns) - continue; - - if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) - continue; - } - if (!column_in_column_list(att->attnum, columns)) continue; @@ -950,8 +923,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, - bool include_generated_columns) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { TupleDesc desc; int i; @@ -969,15 +941,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, if (att->attisdropped) continue; - if (att->attgenerated) - { - if (!include_generated_columns) - continue; - - if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) - continue; - } - if (!column_in_column_list(att->attnum, columns)) continue; @@ -999,15 +962,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, if (att->attisdropped) continue; - if (att->attgenerated) - { - if (!include_generated_columns) - continue; - - if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) - continue; - } - if (!column_in_column_list(att->attnum, columns)) continue; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index a256ab7262..5ab1235c75 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -86,8 +86,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns, - bool include_generated_columns); + Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -165,8 +164,10 @@ typedef struct RelationSyncEntry AttrMap *attrmap; /* - * Columns included in the publication, or NULL if all columns are - * included implicitly. Note that the attnums in this bitmap are not + * Columns should be publicated, or NULL if all columns are included + * implicitly. This bitmap only considers the column list of the + * publication and include_generated_columns option: other reasons should + * be checked at user side. Note that the attnums in this bitmap are not * publication and include_generated_columns option: other reasons should * be checked at user side. Note that the attnums in this bitmap are not * shifted by FirstLowInvalidHeapAttributeNumber. @@ -746,13 +747,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx, relentry->columns, - data->include_generated_columns); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx, relentry->columns, - data->include_generated_columns); + send_relation_and_attrs(relation, xid, ctx, relentry->columns); if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -766,7 +765,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns, bool include_generated_columns) + Bitmapset *columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -786,15 +785,6 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->attisdropped) continue; - if (att->attgenerated) - { - if (!include_generated_columns) - continue; - - if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) - continue; - } - if (att->atttypid < FirstGenbkiObjectId) continue; @@ -808,7 +798,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns, include_generated_columns); + logicalrep_write_rel(ctx->out, xid, relation, columns); OutputPluginWrite(ctx, false); } @@ -1034,6 +1024,36 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, } } +/* + * Prepare new column list bitmap. This includes all the columns of the table. + */ +static Bitmapset * +prepare_all_columns_bms(PGOutputData *data, RelationSyncEntry *entry, + TupleDesc desc) +{ + Bitmapset *cols = NULL; + MemoryContext oldcxt = NULL; + + pgoutput_ensure_entry_cxt(data, entry); + oldcxt = MemoryContextSwitchTo(entry->entry_cxt); + + for (int i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* Skip if the attribute is dropped */ + if (att->attisdropped) + continue; + + /* Iterate the cols until generated columns are found. */ + cols = bms_add_member(cols, i + 1); + } + + MemoryContextSwitchTo(oldcxt); + + return cols; +} + /* * Initialize the column list. */ @@ -1072,7 +1092,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, * if there are no column lists (even if other publications have a * list). */ - if (!pub->alltables) + if (!pub->alltables || !data->include_generated_columns) { bool pub_no_list = true; @@ -1093,9 +1113,10 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, Anum_pg_publication_rel_prattrs, &pub_no_list); + } /* Build the column list bitmap in the per-entry context. */ - if (!pub_no_list) /* when not null */ + if (!pub_no_list || !data->include_generated_columns) /* when not null */ { int i; int nliveatts = 0; @@ -1103,19 +1124,31 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, pgoutput_ensure_entry_cxt(data, entry); - cols = pub_collist_to_bitmapset(cols, cfdatum, + if (!pub_no_list) + cols = pub_collist_to_bitmapset(cols, cfdatum, entry->entry_cxt); + else + cols = prepare_all_columns_bms(data, entry, desc); /* Get the number of live attributes. */ for (i = 0; i < desc->natts; i++) { Form_pg_attribute att = TupleDescAttr(desc, i); + /* Skip if the attribute is dropped */ if (att->attisdropped) continue; - - if (att->attgenerated && att->attgenerated != ATTRIBUTE_GENERATED_STORED) - continue; + /* + * If column list contain generated column it will not replicate + * the table to the subscriber port. + */ + if (att->attgenerated && + att->attgenerated != ATTRIBUTE_GENERATED_STORED && + !data->include_generated_columns) + { + cols = bms_del_member(cols, i + 1); + continue; + } nliveatts++; } @@ -1131,8 +1164,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, } } + if (HeapTupleIsValid(cftuple)) ReleaseSysCache(cftuple); - } } if (first) @@ -1560,18 +1593,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns, - data->include_generated_columns); + data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, - new_slot, data->binary, relentry->columns, - data->include_generated_columns); + new_slot, data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, - data->binary, relentry->columns, - data->include_generated_columns); + data->binary, relentry->columns); break; default: Assert(false); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 34ec40b07e..b9a64d9c95 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -225,22 +225,19 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, - bool binary, Bitmapset *columns, - bool include_generated_columns); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, - Bitmapset *columns, - bool include_generated_columns); + Bitmapset *columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - bool binary, Bitmapset *columns, - bool include_generated_columns); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, @@ -251,8 +248,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel, Bitmapset *columns, - bool include_generated_columns); + Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 3bb2301b43..60ad5751bc 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -1247,7 +1247,7 @@ $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM test_mix_4 ORDER BY a"), qq(1|2|| -3|4||), +3|4||4), 'replication with multiple publications with the same column list'); # TEST: With a table included in multiple publications with different column -- 2.34.1