From 8749a13ef54bbb38f4900413625331f1e9869c8c Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 25 Jun 2024 17:20:47 +0900 Subject: [PATCH] Fix possibility of logical decoding partial of transaction changes. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch-through: --- contrib/test_decoding/Makefile | 3 +- .../expected/skip_snapshot_restore.out | 81 +++++++++++++++++++ .../specs/skip_snapshot_restore.spec | 58 +++++++++++++ src/backend/replication/logical/logical.c | 7 +- src/backend/replication/logical/snapbuild.c | 18 +++-- src/include/replication/logical.h | 6 ++ 6 files changed, 165 insertions(+), 8 deletions(-) create mode 100644 contrib/test_decoding/expected/skip_snapshot_restore.out create mode 100644 contrib/test_decoding/specs/skip_snapshot_restore.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce603706..a4ba1a509a 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 0000000000..cb0040660b --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,81 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert_cat s0_commit s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp0; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert_cat: INSERT INTO tbl VALUES (1); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 0000000000..982fea5a6c --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,58 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + DROP TABLE IF EXISTS user_cat; + CREATE TABLE tbl (val1 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_insert_cat" { INSERT INTO tbl VALUES (1); } +step "s0_savepoint" { SAVEPOINT sp0; } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and will reach the consistent state +# when decoding a RUNNING_XACT record generated after s0-transaction's commit. +# We check if the get_changes on 'slot1' will not return any s0-transaction's +# changes as its confirmed_flush_lsn will be after the s0-transaction's commit +# record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert_cat" "s0_commit" "s1_get_changes_slot1" diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..f4ff00bead 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -151,6 +151,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, + bool in_create, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -296,6 +297,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->fast_forward = fast_forward; + ctx->in_create = in_create; + MemoryContextSwitchTo(old_context); return ctx; @@ -437,7 +440,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, - need_full_snapshot, false, + need_full_snapshot, false, true, xl_routine, prepare_write, do_write, update_progress); @@ -573,7 +576,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, xl_routine, prepare_write, + fast_forward, false, xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7a7aba33e1..1f088dd2f2 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1314,6 +1314,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { + LogicalDecodingContext *ctx = (LogicalDecodingContext *) builder->reorder->private_data; + /* --- * Build catalog decoding snapshot incrementally using information about * the currently running transactions. There are several ways to do that: @@ -1323,10 +1325,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * state while waiting on c)'s sub-states. * * b) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. Can't use this method for the - * initial snapshot when slot is being created and needs full snapshot - * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * snapshot to disk that we can use. Can't use this method while creating + * a slot since in this case the restart LSN is an arbitrary LSN and we + * need to find the start point to extract changes where we won't see + * the data for partial transactions. Also, we cannot use this method + * when slot needs a full snapshot for export or direct use, as that + * snapshot will only contain catalog modifying transactions. * * c) First incrementally build a snapshot for catalog tuples * (BUILDING_SNAPSHOT), that requires all, already in-progress, @@ -1391,8 +1395,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* b) valid on disk state and not building full snapshot */ + /* + * b) valid on disk state and neither building full snapshot nor while + * creating a slot. + */ else if (!builder->building_full_snapshot && + !ctx->in_create && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5f49554ea0..6dc2f4f004 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -100,6 +100,12 @@ typedef struct LogicalDecodingContext */ bool twophase_opt_given; + /* + * True if the logical decoding context being used for the initial + * creation of a logical replication slot. + */ + bool in_create; + /* * State for writing output. */ -- 2.39.3