From 4a488d4bb9a7d0b97e14edde7854fdf0246de249 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Fri, 15 Aug 2025 12:47:52 +0800 Subject: [PATCH vApproach2] delete the unused hash entry on invalidation --- src/backend/replication/pgoutput/pgoutput.c | 209 +++++++++++++------- 1 file changed, 139 insertions(+), 70 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 80540c017bd..feb07529bf1 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -131,6 +131,8 @@ typedef struct RelationSyncEntry bool schema_sent; + int in_use; + /* * This will be PUBLISH_GENCOLS_STORED if the relation contains generated * columns and the 'publish_generated_columns' parameter is set to @@ -223,6 +225,9 @@ static void init_rel_sync_cache(MemoryContext cachectx); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation); +static void close_rel_sync_entry(RelationSyncEntry *entry); +static void cleanup_rel_sync_entry(RelationSyncEntry *entry); +static void delete_rel_sync_entry(RelationSyncEntry *entry); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry); @@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + close_rel_sync_entry(relentry); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + close_rel_sync_entry(relentry); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + close_rel_sync_entry(relentry); return; + } /* * This is only possible if deletes are allowed even when replica @@ -1621,6 +1635,8 @@ cleanup: ExecDropSingleTupleTableSlot(new_slot); } + close_rel_sync_entry(relentry); + MemoryContextSwitchTo(old); MemoryContextReset(data->context); } @@ -1658,7 +1674,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) + { + close_rel_sync_entry(relentry); continue; + } /* * Don't send partitions if the publication wants to send only the @@ -1666,7 +1685,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ if (relation->rd_rel->relispartition && relentry->publish_as_relid != relid) + { + close_rel_sync_entry(relentry); continue; + } relids[nrelids++] = relid; @@ -1675,6 +1697,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, pgoutput_send_begin(ctx, txn); maybe_send_schema(ctx, change, relation, relentry); + + close_rel_sync_entry(relentry); } if (nrelids > 0) @@ -2048,6 +2072,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { entry->replicate_valid = false; entry->schema_sent = false; + entry->in_use = false; entry->include_gencols_type = PUBLISH_GENCOLS_NONE; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = @@ -2061,6 +2086,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->attrmap = NULL; } + Assert(!entry->in_use); + + entry->in_use = true; + /* Validate the entry */ if (!entry->replicate_valid) { @@ -2091,71 +2120,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } - /* - * Reset schema_sent status as the relation definition may have - * changed. Also reset pubactions to empty in case rel was dropped - * from a publication. Also free any objects that depended on the - * earlier definition. - */ - entry->schema_sent = false; - entry->include_gencols_type = PUBLISH_GENCOLS_NONE; - list_free(entry->streamed_txns); - entry->streamed_txns = NIL; - bms_free(entry->columns); - entry->columns = NULL; - entry->pubactions.pubinsert = false; - entry->pubactions.pubupdate = false; - entry->pubactions.pubdelete = false; - entry->pubactions.pubtruncate = false; - - /* - * Tuple slots cleanups. (Will be rebuilt later if needed). - */ - if (entry->old_slot) - { - TupleDesc desc = entry->old_slot->tts_tupleDescriptor; - - Assert(desc->tdrefcount == -1); - - ExecDropSingleTupleTableSlot(entry->old_slot); - - /* - * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so - * do it now to avoid any leaks. - */ - FreeTupleDesc(desc); - } - if (entry->new_slot) - { - TupleDesc desc = entry->new_slot->tts_tupleDescriptor; - - Assert(desc->tdrefcount == -1); - - ExecDropSingleTupleTableSlot(entry->new_slot); - - /* - * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so - * do it now to avoid any leaks. - */ - FreeTupleDesc(desc); - } - - entry->old_slot = NULL; - entry->new_slot = NULL; - - if (entry->attrmap) - free_attrmap(entry->attrmap); - entry->attrmap = NULL; - - /* - * Row filter cache cleanups. - */ - if (entry->entry_cxt) - MemoryContextDelete(entry->entry_cxt); - - entry->entry_cxt = NULL; - entry->estate = NULL; - memset(entry->exprstate, 0, sizeof(entry->exprstate)); + /* Cleanup existing data */ + cleanup_rel_sync_entry(entry); /* * Build publication cache. We can't use one provided by relcache as @@ -2311,6 +2277,101 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) return entry; } +/* + * Mark the given entry as unused. + */ +static void +close_rel_sync_entry(RelationSyncEntry *relentry) +{ + Assert(relentry->in_use); + relentry->in_use = false; +} + +/* + * Cleanup attributes in the given entry. + */ +static void +cleanup_rel_sync_entry(RelationSyncEntry *entry) +{ + /* + * Reset schema_sent status as the relation definition may have changed. + * Also reset pubactions to empty in case rel was dropped from a + * publication. Also free any objects that depended on the earlier + * definition. + */ + entry->schema_sent = false; + entry->include_gencols_type = PUBLISH_GENCOLS_NONE; + list_free(entry->streamed_txns); + entry->streamed_txns = NIL; + bms_free(entry->columns); + entry->columns = NULL; + entry->pubactions.pubinsert = false; + entry->pubactions.pubupdate = false; + entry->pubactions.pubdelete = false; + entry->pubactions.pubtruncate = false; + + /* + * Tuple slots cleanups. (Will be rebuilt later if needed). + */ + if (entry->old_slot) + { + TupleDesc desc = entry->old_slot->tts_tupleDescriptor; + + Assert(desc->tdrefcount == -1); + + ExecDropSingleTupleTableSlot(entry->old_slot); + + /* + * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do + * it now to avoid any leaks. + */ + FreeTupleDesc(desc); + } + if (entry->new_slot) + { + TupleDesc desc = entry->new_slot->tts_tupleDescriptor; + + Assert(desc->tdrefcount == -1); + + ExecDropSingleTupleTableSlot(entry->new_slot); + + /* + * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do + * it now to avoid any leaks. + */ + FreeTupleDesc(desc); + } + + entry->old_slot = NULL; + entry->new_slot = NULL; + + if (entry->attrmap) + free_attrmap(entry->attrmap); + entry->attrmap = NULL; + + /* + * Row filter cache cleanups. + */ + if (entry->entry_cxt) + MemoryContextDelete(entry->entry_cxt); + + entry->entry_cxt = NULL; + entry->estate = NULL; + memset(entry->exprstate, 0, sizeof(entry->exprstate)); +} + +static void +delete_rel_sync_entry(RelationSyncEntry *entry) +{ + Assert(!entry->in_use); + + cleanup_rel_sync_entry(entry); + + /* Remove the etnry from the cache */ + if (hash_search(RelationSyncCache, &entry->relid, HASH_REMOVE, NULL) == NULL) + elog(ERROR, "hash table corrupted"); +} + /* * Cleanup list of streamed transactions and update the schema_sent flag. * @@ -2374,9 +2435,9 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * Nobody keeps pointers to entries in this hash table around outside * logical decoding callback calls - but invalidation events can come in * *during* a callback if we do any syscache access in the callback. - * Because of that we must mark the cache entry as invalid but not damage - * any of its substructure here. The next get_rel_sync_entry() call will - * rebuild it all. + * Because of that, if the hash entry is being used, we must mark the cache + * entry as invalid but not damage any of its substructure here. The next + * get_rel_sync_entry() call will rebuild it all. */ if (OidIsValid(relid)) { @@ -2387,7 +2448,12 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid, HASH_FIND, NULL); if (entry != NULL) - entry->replicate_valid = false; + { + if (entry->in_use) + entry->replicate_valid = false; + else + delete_rel_sync_entry(entry); + } } else { @@ -2397,7 +2463,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) { - entry->replicate_valid = false; + if (entry->in_use) + entry->replicate_valid = false; + else + delete_rel_sync_entry(entry); } } } -- 2.50.1.windows.1