From 2b26f3370409446356d3c7a019e291b795944261 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 13 Mar 2024 02:25:30 +0000 Subject: [PATCH v3] Make the decoded transaction as subxact while decoding COMMIT record Since ReorderBufferXidSetCatalogChanges() generates ReorderBufferTXNs as top-level ones, it was possible that two transactions were decoded as top-level by the same COMMIT record. This commit adds ReorderBufferXidSetCatalogChangesEx(), which can control whether to create a decoded transaction as a top. The function is used when the COMMIT is decoded and the record has invalidation messages. Author: Haiyang Li, Hayato Kuroda Reported-by: Haiyang Li Reviewed-by: Haiyang Li, Alexander Lakhin, Fei Changhong Discussion: https://www.postgresql.org/message-id/18369-ad61699bf91c5bc0%40postgresql.org Backpatch-through: 12 --- .../expected/catalog_change_snapshot.out | 44 ++++++++++++++++++- .../specs/catalog_change_snapshot.spec | 18 ++++++++ .../replication/logical/reorderbuffer.c | 26 +++++++++-- src/backend/replication/logical/snapbuild.c | 2 +- src/include/replication/reorderbuffer.h | 1 + 5 files changed, 85 insertions(+), 6 deletions(-) diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index 1d75cf5af0..6c3b8fc8b0 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -1,4 +1,4 @@ -Parsed test spec with 2 sessions +Parsed test spec with 3 sessions starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); @@ -87,3 +87,45 @@ COMMIT stop (1 row) + +starting permutation: s0_init s0_begin s0_savepoint s0_create_part1 s0_savepoint_release s2_init s1_checkpoint s1_get_changes s0_commit s2_get_changes s2_stop +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s2_init: SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s2_init: <... completed> +?column? +-------- +init +(1 row) + +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s2_stop: SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); +?column? +-------- +stop +(1 row) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index 2ad1edeaa8..a4642e26db 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -3,7 +3,9 @@ setup { DROP TABLE IF EXISTS tbl1; + DROP TABLE IF EXISTS tbl1_part; CREATE TABLE tbl1 (val1 integer, val2 integer); + CREATE TABLE tbl1_part (val1 integer) PARTITION BY RANGE (val1); CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); } @@ -19,9 +21,11 @@ setup { SET synchronous_commit=on; } step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } step "s0_begin" { BEGIN; } step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } step "s0_insert2" { INSERT INTO user_cat VALUES (1); } +step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); } step "s0_commit" { COMMIT; } session "s1" @@ -29,6 +33,12 @@ setup { SET synchronous_commit=on; } step "s1_checkpoint" { CHECKPOINT; } step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +session "s2" +setup { SET synchronous_commit=on; } +step "s2_init" { SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); } +step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s2_stop" { SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); } + # For the transaction that TRUNCATEd the table tbl1, the last decoding decodes # only its COMMIT record, because it starts from the RUNNING_XACTS record emitted # during the first checkpoint execution. This transaction must be marked as @@ -53,3 +63,11 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s # transaction to do timetravel since one of its subtransactions has been marked as # containing catalog changes. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# The last decoding restarts from the slot creation by session 2 and doesn't +# decode any WAL records generated by the subtransaction that performed +# s0_create_part1. While processing the commit record for the corresponding +# top-level transaction which will be marked as containing catalog change, we +# ensure that the corresponding substransaction is added into ReorderBuffer as +# subxact. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s2_init" "s1_checkpoint" "s1_get_changes" "s0_commit" "s2_get_changes" "s2_stop" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fb323a80ec..d6e3666370 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -878,8 +878,14 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, * We already saw this transaction, but initially added it to the * list of top-level txns. Now that we know it's not top-level, * remove it from there. + * + * Note: There is a possibility that the transaction was created + * as not the top-level txn, but the flag was not set. In this + * case, the transaction was not added to the list. This happens if + * sub-txns are first recognized and decoded by a COMMIT record. */ - dlist_delete(&subtxn->node); + if (subtxn->node.next != NULL) + dlist_delete(&subtxn->node); } } @@ -1338,14 +1344,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Remove TXN from its containing list. + * Remove TXN from its containing list if registered. * * Note: if txn is known as subxact, we are deleting the TXN from its * parent's list of known subxacts; this leaves the parent's nsubxacts * count too high, but we don't care. Otherwise, we are deleting the TXN * from the LSN-ordered list of toplevel TXNs. */ - dlist_delete(&txn->node); + if (txn->node.next != NULL) + dlist_delete(&txn->node); /* now remove reference from buffer */ hash_search(rb->by_txn, @@ -2291,10 +2298,21 @@ ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferXidSetCatalogChangesEx(rb, xid, lsn, true); +} + +/* + * Mark a transaction as containing catalog changes. Moreover, this can control + * whether the ReorderBufferTXN is created as top transaction or not. + */ +void +ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, bool is_top) { ReorderBufferTXN *txn; - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, is_top); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 31caad8855..417dccf1be 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -2165,6 +2165,6 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt sizeof(TransactionId), xidComparator) != NULL) { for (int i = 0; i < subxcnt; i++) - ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + ReorderBufferXidSetCatalogChangesEx(builder->reorder, subxacts[i], lsn, false); } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5347597e92..15d7446ebc 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -455,6 +455,7 @@ void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations); void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); +void ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn, bool is_top); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); -- 2.43.0