From b463dd180ab5820ac5b4144ff22622b2a6340b09 Mon Sep 17 00:00:00 2001 From: nkey Date: Mon, 6 May 2024 01:08:49 +0200 Subject: [PATCH v1] WIP: fix d9d076222f5b "VACUUM: ignore indexing operations with CONCURRENTLY" which was reverted by e28bb8851969. Introduce new type of visibility horizon to be used for relation with concurrently build indexes (in the case of "safe" index). --- src/backend/catalog/index.c | 3 + src/backend/storage/ipc/procarray.c | 72 ++++++++++- src/backend/utils/cache/relcache.c | 6 + src/bin/pg_amcheck/t/006_concurrently.pl | 155 +++++++++++++++++++++++ src/include/utils/rel.h | 1 + 5 files changed, 231 insertions(+), 6 deletions(-) create mode 100644 src/bin/pg_amcheck/t/006_concurrently.pl diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 5a8568c55c..6ad9254d49 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -3320,6 +3320,9 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot) /* Open and lock the parent heap relation */ heapRelation = table_open(heapId, ShareUpdateExclusiveLock); + /* Load information about the building indexes */ + RelationGetIndexList(heapRelation); + Assert(heapRelation->rd_indexisbuilding); /* * Switch to the table owner's userid, so that any index functions are run diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 1a83c4220b..a2fe173bb6 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -236,6 +236,12 @@ typedef struct ComputeXidHorizonsResult */ TransactionId data_oldest_nonremovable; + /* + * Oldest xid for which deleted tuples need to be retained in normal user + * defined tables with index building in progress. + */ + TransactionId create_index_concurrently_oldest_nonremovable; + /* * Oldest xid for which deleted tuples need to be retained in this * session's temporary tables. @@ -251,6 +257,7 @@ typedef enum GlobalVisHorizonKind VISHORIZON_SHARED, VISHORIZON_CATALOG, VISHORIZON_DATA, + VISHORIZON_BUILD_INDEX_CONCURRENTLY, VISHORIZON_TEMP, } GlobalVisHorizonKind; @@ -297,6 +304,7 @@ static TransactionId standbySnapshotPendingXmin; static GlobalVisState GlobalVisSharedRels; static GlobalVisState GlobalVisCatalogRels; static GlobalVisState GlobalVisDataRels; +static GlobalVisState GlobalVisBuildIndexConcurrentlyRels; static GlobalVisState GlobalVisTempRels; /* @@ -1727,9 +1735,6 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) bool in_recovery = RecoveryInProgress(); TransactionId *other_xids = ProcGlobal->xids; - /* inferred after ProcArrayLock is released */ - h->catalog_oldest_nonremovable = InvalidTransactionId; - LWLockAcquire(ProcArrayLock, LW_SHARED); h->latest_completed = TransamVariables->latestCompletedXid; @@ -1749,7 +1754,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->oldest_considered_running = initial; h->shared_oldest_nonremovable = initial; + h->catalog_oldest_nonremovable = initial; h->data_oldest_nonremovable = initial; + h->create_index_concurrently_oldest_nonremovable = initial; /* * Only modifications made by this backend affect the horizon for @@ -1847,11 +1854,28 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) (statusFlags & PROC_AFFECTS_ALL_HORIZONS) || in_recovery) { - h->data_oldest_nonremovable = - TransactionIdOlder(h->data_oldest_nonremovable, xmin); + h->create_index_concurrently_oldest_nonremovable = + TransactionIdOlder(h->create_index_concurrently_oldest_nonremovable, xmin); + + if (!(statusFlags & PROC_IN_SAFE_IC)) + h->data_oldest_nonremovable = + TransactionIdOlder(h->data_oldest_nonremovable, xmin); + + /* Catalog tables need to consider all backends in this db */ + h->catalog_oldest_nonremovable = + TransactionIdOlder(h->catalog_oldest_nonremovable, xmin); + } } + /* catalog horizon should never be later than data */ + Assert(TransactionIdPrecedesOrEquals(h->catalog_oldest_nonremovable, + h->data_oldest_nonremovable)); + + /* data horizon should never be later than index building horizon */ + Assert(TransactionIdPrecedesOrEquals(h->create_index_concurrently_oldest_nonremovable, + h->data_oldest_nonremovable)); + /* * If in recovery fetch oldest xid in KnownAssignedXids, will be applied * after lock is released. @@ -1873,6 +1897,10 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) TransactionIdOlder(h->shared_oldest_nonremovable, kaxmin); h->data_oldest_nonremovable = TransactionIdOlder(h->data_oldest_nonremovable, kaxmin); + h->create_index_concurrently_oldest_nonremovable = + TransactionIdOlder(h->create_index_concurrently_oldest_nonremovable, kaxmin); + h->catalog_oldest_nonremovable = + TransactionIdOlder(h->catalog_oldest_nonremovable, kaxmin); /* temp relations cannot be accessed in recovery */ } @@ -1880,6 +1908,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->shared_oldest_nonremovable)); Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable, h->data_oldest_nonremovable)); + Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable, + h->create_index_concurrently_oldest_nonremovable)); /* * Check whether there are replication slots requiring an older xmin. @@ -1888,6 +1918,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) TransactionIdOlder(h->shared_oldest_nonremovable, h->slot_xmin); h->data_oldest_nonremovable = TransactionIdOlder(h->data_oldest_nonremovable, h->slot_xmin); + h->create_index_concurrently_oldest_nonremovable = + TransactionIdOlder(h->create_index_concurrently_oldest_nonremovable, h->slot_xmin); /* * The only difference between catalog / data horizons is that the slot's @@ -1900,7 +1932,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->shared_oldest_nonremovable = TransactionIdOlder(h->shared_oldest_nonremovable, h->slot_catalog_xmin); - h->catalog_oldest_nonremovable = h->data_oldest_nonremovable; + h->catalog_oldest_nonremovable = + TransactionIdOlder(h->catalog_oldest_nonremovable, + h->slot_xmin); h->catalog_oldest_nonremovable = TransactionIdOlder(h->catalog_oldest_nonremovable, h->slot_catalog_xmin); @@ -1918,6 +1952,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->oldest_considered_running = TransactionIdOlder(h->oldest_considered_running, h->data_oldest_nonremovable); + h->oldest_considered_running = + TransactionIdOlder(h->oldest_considered_running, + h->create_index_concurrently_oldest_nonremovable); /* * shared horizons have to be at least as old as the oldest visible in @@ -1925,6 +1962,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) */ Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable, h->data_oldest_nonremovable)); + Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable, + h->create_index_concurrently_oldest_nonremovable)); Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable, h->catalog_oldest_nonremovable)); @@ -1938,6 +1977,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->catalog_oldest_nonremovable)); Assert(TransactionIdPrecedesOrEquals(h->oldest_considered_running, h->data_oldest_nonremovable)); + Assert(TransactionIdPrecedesOrEquals(h->oldest_considered_running, + h->create_index_concurrently_oldest_nonremovable)); Assert(TransactionIdPrecedesOrEquals(h->oldest_considered_running, h->temp_oldest_nonremovable)); Assert(!TransactionIdIsValid(h->slot_xmin) || @@ -1972,6 +2013,8 @@ GlobalVisHorizonKindForRel(Relation rel) else if (IsCatalogRelation(rel) || RelationIsAccessibleInLogicalDecoding(rel)) return VISHORIZON_CATALOG; + else if (rel != NULL && ((rel->rd_indexvalid && rel->rd_indexisbuilding) || IsToastRelation(rel))) + return VISHORIZON_BUILD_INDEX_CONCURRENTLY; else if (!RELATION_IS_LOCAL(rel)) return VISHORIZON_DATA; else @@ -2004,6 +2047,8 @@ GetOldestNonRemovableTransactionId(Relation rel) return horizons.catalog_oldest_nonremovable; case VISHORIZON_DATA: return horizons.data_oldest_nonremovable; + case VISHORIZON_BUILD_INDEX_CONCURRENTLY: + return horizons.create_index_concurrently_oldest_nonremovable; case VISHORIZON_TEMP: return horizons.temp_oldest_nonremovable; } @@ -2454,6 +2499,9 @@ GetSnapshotData(Snapshot snapshot) GlobalVisDataRels.definitely_needed = FullTransactionIdNewer(def_vis_fxid_data, GlobalVisDataRels.definitely_needed); + GlobalVisBuildIndexConcurrentlyRels.definitely_needed = + FullTransactionIdNewer(def_vis_fxid_data, + GlobalVisBuildIndexConcurrentlyRels.definitely_needed); /* See temp_oldest_nonremovable computation in ComputeXidHorizons() */ if (TransactionIdIsNormal(myxid)) GlobalVisTempRels.definitely_needed = @@ -2478,6 +2526,9 @@ GetSnapshotData(Snapshot snapshot) GlobalVisCatalogRels.maybe_needed = FullTransactionIdNewer(GlobalVisCatalogRels.maybe_needed, oldestfxid); + GlobalVisBuildIndexConcurrentlyRels.maybe_needed = + FullTransactionIdNewer(GlobalVisBuildIndexConcurrentlyRels.maybe_needed, + oldestfxid); GlobalVisDataRels.maybe_needed = FullTransactionIdNewer(GlobalVisDataRels.maybe_needed, oldestfxid); @@ -4106,6 +4157,9 @@ GlobalVisTestFor(Relation rel) case VISHORIZON_DATA: state = &GlobalVisDataRels; break; + case VISHORIZON_BUILD_INDEX_CONCURRENTLY: + state = &GlobalVisBuildIndexConcurrentlyRels; + break; case VISHORIZON_TEMP: state = &GlobalVisTempRels; break; @@ -4158,6 +4212,9 @@ GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons) GlobalVisDataRels.maybe_needed = FullXidRelativeTo(horizons->latest_completed, horizons->data_oldest_nonremovable); + GlobalVisBuildIndexConcurrentlyRels.maybe_needed = + FullXidRelativeTo(horizons->latest_completed, + horizons->create_index_concurrently_oldest_nonremovable); GlobalVisTempRels.maybe_needed = FullXidRelativeTo(horizons->latest_completed, horizons->temp_oldest_nonremovable); @@ -4176,6 +4233,9 @@ GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons) GlobalVisDataRels.definitely_needed = FullTransactionIdNewer(GlobalVisDataRels.maybe_needed, GlobalVisDataRels.definitely_needed); + GlobalVisBuildIndexConcurrentlyRels.definitely_needed = + FullTransactionIdNewer(GlobalVisBuildIndexConcurrentlyRels.maybe_needed, + GlobalVisBuildIndexConcurrentlyRels.definitely_needed); GlobalVisTempRels.definitely_needed = GlobalVisTempRels.maybe_needed; ComputeXidHorizonsResultLastXmin = RecentXmin; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 262c9878dd..677ba61205 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -4769,6 +4769,7 @@ RelationGetIndexList(Relation relation) Oid pkeyIndex = InvalidOid; Oid candidateIndex = InvalidOid; bool pkdeferrable = false; + bool indexisbuilding = false; MemoryContext oldcxt; /* Quick exit if we already computed the list. */ @@ -4809,6 +4810,10 @@ RelationGetIndexList(Relation relation) /* add index's OID to result list */ result = lappend_oid(result, index->indexrelid); + /* consider index as building if it is ready but not yet valid */ + if (index->indisready && !index->indisvalid) + indexisbuilding = true; + /* * Non-unique or predicate indexes aren't interesting for either oid * indexes or replication identity indexes, so don't check them. @@ -4869,6 +4874,7 @@ RelationGetIndexList(Relation relation) relation->rd_indexlist = list_copy(result); relation->rd_pkindex = pkeyIndex; relation->rd_ispkdeferrable = pkdeferrable; + relation->rd_indexisbuilding = indexisbuilding; if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex) && !pkdeferrable) relation->rd_replidindex = pkeyIndex; else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex)) diff --git a/src/bin/pg_amcheck/t/006_concurrently.pl b/src/bin/pg_amcheck/t/006_concurrently.pl new file mode 100644 index 0000000000..7b8afeead5 --- /dev/null +++ b/src/bin/pg_amcheck/t/006_concurrently.pl @@ -0,0 +1,155 @@ + +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Test REINDEX CONCURRENTLY with concurrent modifications and HOT updates +use strict; +use warnings; + +use Config; +use Errno; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Time::HiRes qw(usleep); +use IPC::SysV; +use threads; +use Test::More; +use Test::Builder; + +if ($@ || $windows_os) +{ + plan skip_all => 'Fork and shared memory are not supported by this platform'; +} + +# TODO: refactor to https://metacpan.org/pod/IPC%3A%3AShareable +my ($pid, $shmem_id, $shmem_key, $shmem_size); +eval 'sub IPC_CREAT {0001000}' unless defined &IPC_CREAT; +$shmem_size = 4; +$shmem_key = rand(1000000); +$shmem_id = shmget($shmem_key, $shmem_size, &IPC_CREAT | 0777) or die "Can't shmget: $!"; +shmwrite($shmem_id, "wait", 0, $shmem_size) or die "Can't shmwrite: $!"; + +my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); +# +# Test set-up +# +my ($node, $result); +$node = PostgreSQL::Test::Cluster->new('RC_test'); +$node->init; +$node->append_conf('postgresql.conf', + 'lock_timeout = ' . (1000 * $PostgreSQL::Test::Utils::timeout_default)); +$node->append_conf('postgresql.conf', 'fsync = off'); +$node->start; +$node->safe_psql('postgres', q(CREATE EXTENSION amcheck)); +$node->safe_psql('postgres', q(CREATE TABLE tbl(i int primary key, + c1 money default 0,c2 money default 0, + c3 money default 0, updated_at timestamp))); +$node->safe_psql('postgres', q(CREATE INDEX idx ON tbl(i))); + +my $builder = Test::More->builder; +$builder->use_numbers(0); +$builder->no_plan(); + +my $child = $builder->child("pg_bench"); + +if(!defined($pid = fork())) { + # fork returned undef, so unsuccessful + die "Cannot fork a child: $!"; +} elsif ($pid == 0) { + + $node->pgbench( + '--no-vacuum --client=5 --transactions=25000', + 0, + [qr{actually processed}], + [qr{^$}], + 'concurrent INSERTs, UPDATES and RC', + { + '002_pgbench_concurrent_transaction_inserts' => q( + BEGIN; + INSERT INTO tbl VALUES(random()*10000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + INSERT INTO tbl VALUES(random()*10000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + INSERT INTO tbl VALUES(random()*10000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + INSERT INTO tbl VALUES(random()*10000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + + INSERT INTO tbl VALUES(random()*10000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + COMMIT; + ), + # Ensure some HOT updates happen + '002_pgbench_concurrent_transaction_updates' => q( + BEGIN; + INSERT INTO tbl VALUES(random()*1000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + INSERT INTO tbl VALUES(random()*1000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + INSERT INTO tbl VALUES(random()*1000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + INSERT INTO tbl VALUES(random()*1000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + + INSERT INTO tbl VALUES(random()*1000,0,0,0,now()) + on conflict(i) do update set updated_at = now(); + COMMIT; + ) + }); + + if ($child->is_passing()) { + shmwrite($shmem_id, "done", 0, $shmem_size) or die "Can't shmwrite: $!"; + } else { + shmwrite($shmem_id, "fail", 0, $shmem_size) or die "Can't shmwrite: $!"; + } + + sleep(1); +} else { + my $pg_bench_fork_flag; + shmread($shmem_id, $pg_bench_fork_flag, 0, $shmem_size) or die "Can't shmread: $!"; + + subtest 'reindex run subtest' => sub { + is($pg_bench_fork_flag, "wait", "pg_bench_fork_flag is correct"); + + my %psql = (stdin => '', stdout => '', stderr => ''); + $psql{run} = IPC::Run::start( + [ 'psql', '-XA', '-f', '-', '-d', $node->connstr('postgres') ], + '<', + \$psql{stdin}, + '>', + \$psql{stdout}, + '2>', + \$psql{stderr}, + $psql_timeout); + + my ($result, $stdout, $stderr); + while (1) + { + + ($result, $stdout, $stderr) = $node->psql('postgres', q(REINDEX INDEX CONCURRENTLY idx;)); + is($result, '0', 'REINDEX is correct'); + + ($result, $stdout, $stderr) = $node->psql('postgres', q(SELECT bt_index_parent_check('idx', true, true);)); + is($result, '0', 'bt_index_check is correct'); + if ($result) + { + diag($stderr); + } + + shmread($shmem_id, $pg_bench_fork_flag, 0, $shmem_size) or die "Can't shmread: $!"; + last if $pg_bench_fork_flag ne "wait"; + } + + # explicitly shut down psql instances gracefully + $psql{stdin} .= "\\q\n"; + $psql{run}->finish; + + is($pg_bench_fork_flag, "done", "pg_bench_fork_flag is correct"); + }; + + + $child->finalize(); + $child->summary(); + $node->stop; + done_testing(); +} diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 8700204953..a9e2d1beab 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -152,6 +152,7 @@ typedef struct RelationData List *rd_indexlist; /* list of OIDs of indexes on relation */ Oid rd_pkindex; /* OID of (deferrable?) primary key, if any */ bool rd_ispkdeferrable; /* is rd_pkindex a deferrable PK? */ + bool rd_indexisbuilding; /* is index building in progress for relation */ Oid rd_replidindex; /* OID of replica identity index, if any */ /* data managed by RelationGetStatExtList: */ -- 2.34.1