From cf40167f621111c61489d9640769ccbc8885d0f2 Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Wed, 1 Jan 2025 15:25:20 +0100 Subject: [PATCH v24 04/12] Support snapshot resets in parallel concurrent index builds Extend periodic snapshot reset support to parallel builds, previously limited to non-parallel operations. This allows the xmin horizon to advance during parallel concurrent index builds as well. The main limitation of applying that technic to parallel builds was a requirement to wait until workers processes restore their initial snapshot from leader. To address this, following changes applied: - add infrastructure to track snapshot restoration in parallel workers - extend parallel scan initialization to support periodic snapshot resets - wait for parallel workers to restore their initial snapshots before proceeding with scan - relax limitation for parallel worker to call GetLatestSnapshot --- src/backend/access/brin/brin.c | 50 +++++++++------- src/backend/access/gin/gininsert.c | 50 +++++++++------- src/backend/access/heap/heapam_handler.c | 12 ++-- src/backend/access/nbtree/nbtsort.c | 57 ++++++++++++++----- src/backend/access/table/tableam.c | 37 ++++++++++-- src/backend/access/transam/parallel.c | 50 ++++++++++++++-- src/backend/catalog/index.c | 2 +- src/backend/executor/nodeSeqscan.c | 3 +- src/backend/utils/time/snapmgr.c | 8 --- src/include/access/parallel.h | 3 +- src/include/access/relscan.h | 1 + src/include/access/tableam.h | 9 +-- .../expected/cic_reset_snapshots.out | 25 +++++++- .../sql/cic_reset_snapshots.sql | 7 ++- 14 files changed, 225 insertions(+), 89 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 186edd0d229..5554cfa6f4d 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -143,7 +143,6 @@ typedef struct BrinLeader */ BrinShared *brinshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } BrinLeader; @@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _brin_parallel_estimate_shared(Relation heap); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -1221,7 +1220,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) reltuples = _brin_parallel_merge(state); _brin_end_parallel(state->bs_leader, state); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } else /* no parallel index build */ { @@ -1254,7 +1252,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) brin_fill_empty_ranges(state, state->bs_currRangeStart, state->bs_maxRangeStart); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } /* release resources */ @@ -1269,6 +1266,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = idxtuples; + Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid)); return result; } @@ -2368,7 +2366,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estbrinshared; Size estsort; BrinShared *brinshared; @@ -2399,25 +2396,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = _brin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -2457,8 +2454,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -2483,7 +2478,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromBrinShared(brinshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2529,7 +2525,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->nparticipanttuplesorts++; brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; - brinleader->snapshot = snapshot; brinleader->walusage = walusage; brinleader->bufferusage = bufferusage; @@ -2545,6 +2540,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = brinleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * We need to wait until all workers imported initial snapshot. + */ + if (isconcurrent) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _brin_leader_participate_as_worker(buildstate, heap, index); @@ -2553,7 +2555,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!isconcurrent) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); } @@ -2576,9 +2579,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); DestroyParallelContext(brinleader->pcxt); ExitParallelMode(); } @@ -2778,14 +2778,14 @@ _brin_parallel_merge(BrinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. + * brin index build. */ static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_brin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* @@ -2807,6 +2807,7 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re /* Perform work common to all participants */ _brin_parallel_scan_and_build(buildstate, brinleader->brinshared, brinleader->sharedsort, heap, index, sortmem, true); + Assert(!brinleader->brinshared->isconcurrent || !TransactionIdIsValid(MyProc->xid)); } /* @@ -2947,6 +2948,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); + if (brinshared->isconcurrent) + { + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xid)); + PushActiveSnapshot(GetTransactionSnapshot()); + } /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 2f947d36619..bf26106aa5e 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -132,7 +132,6 @@ typedef struct GinLeader */ GinBuildShared *ginshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } GinLeader; @@ -180,7 +179,7 @@ typedef struct static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); -static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _gin_parallel_estimate_shared(Relation heap); static double _gin_parallel_heapscan(GinBuildState *state); static double _gin_parallel_merge(GinBuildState *state); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, @@ -717,7 +716,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) reltuples = _gin_parallel_merge(state); _gin_end_parallel(state->bs_leader, state); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } else /* no parallel index build */ { @@ -741,7 +739,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) list, nlist, &buildstate.buildStats); } MemoryContextSwitchTo(oldCtx); - Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); } MemoryContextDelete(buildstate.funcCtx); @@ -771,6 +768,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) result->heap_tuples = reltuples; result->index_tuples = buildstate.indtuples; + Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); return result; } @@ -905,7 +903,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estginshared; Size estsort; GinBuildShared *ginshared; @@ -935,25 +932,25 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. */ - estginshared = _gin_parallel_estimate_shared(heap, snapshot); + estginshared = _gin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estginshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -993,8 +990,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -1018,7 +1013,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1060,7 +1056,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, ginleader->nparticipanttuplesorts++; ginleader->ginshared = ginshared; ginleader->sharedsort = sharedsort; - ginleader->snapshot = snapshot; ginleader->walusage = walusage; ginleader->bufferusage = bufferusage; @@ -1076,6 +1071,13 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = ginleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * We need to wait until all workers imported initial snapshot. + */ + if (isconcurrent) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _gin_leader_participate_as_worker(buildstate, heap, index); @@ -1084,7 +1086,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!isconcurrent) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); } @@ -1107,9 +1110,6 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) for (i = 0; i < ginleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) - UnregisterSnapshot(ginleader->snapshot); DestroyParallelContext(ginleader->pcxt); ExitParallelMode(); } @@ -1790,14 +1790,14 @@ _gin_parallel_merge(GinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. + * gin index build. */ static Size -_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_gin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(GinBuildShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* @@ -1820,6 +1820,7 @@ _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Rela _gin_parallel_scan_and_build(buildstate, ginleader->ginshared, ginleader->sharedsort, heap, index, sortmem, true); + Assert(!ginleader->ginshared->isconcurrent || !TransactionIdIsValid(MyProc->xid)); } /* @@ -2179,6 +2180,13 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); + if (ginshared->isconcurrent) + { + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xid)); + PushActiveSnapshot(GetTransactionSnapshot()); + } /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index e32ee739733..a7e16871af6 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1235,14 +1235,13 @@ heapam_index_build_range_scan(Relation heapRelation, * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, or during bootstrap, we take a regular MVCC snapshot - * and index whatever's live according to that. + * and index whatever's live according to that while that snapshot is reset + * every so often (in case of non-unique index). */ OldestXmin = InvalidTransactionId; /* * For unique index we need consistent snapshot for the whole scan. - * In case of parallel scan some additional infrastructure required - * to perform scan with SO_RESET_SNAPSHOT which is not yet ready. */ reset_snapshots = indexInfo->ii_Concurrent && !indexInfo->ii_Unique && @@ -1304,8 +1303,11 @@ heapam_index_build_range_scan(Relation heapRelation, Assert(!IsBootstrapProcessingMode()); Assert(allow_sync); snapshot = scan->rs_snapshot; - PushActiveSnapshot(snapshot); - need_pop_active_snapshot = true; + if (!reset_snapshots) + { + PushActiveSnapshot(snapshot); + need_pop_active_snapshot = true; + } } hscan = (HeapScanDesc) scan; diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 7b09ad878b7..53b7ddfff0e 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -322,22 +322,20 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo) RelationGetRelationName(index)); reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* * Finish the build by (1) completing the sort of the spool file, (2) * inserting the sorted tuples into btree pages and (3) building the upper * levels. Finally, it may also be necessary to end use of parallelism. */ - _bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_ParallelWorkers && indexInfo->ii_Concurrent); + _bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_Unique && indexInfo->ii_Concurrent); _bt_spooldestroy(buildstate.spool); if (buildstate.spool2) _bt_spooldestroy(buildstate.spool2); if (buildstate.btleader) _bt_end_parallel(buildstate.btleader); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult)); @@ -486,8 +484,7 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate, reltuples = _bt_parallel_heapscan(buildstate, &indexInfo->ii_BrokenHotChain); InvalidateCatalogSnapshot(); - Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique || - !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin)); + Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* * Set the progress target for the next phase. Reset the block number @@ -1421,6 +1418,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -1438,12 +1436,21 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) scantuplesortstates = leaderparticipates ? request + 1 : request; + /* + * For concurrent non-unique index builds, we can periodically reset snapshots + * to allow the xmin horizon to advance. This is safe since these builds don't + * require a consistent view across the entire scan. Unique indexes still need + * a stable snapshot to properly enforce uniqueness constraints. + */ + reset_snapshot = isconcurrent && !btspool->isunique; + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * live according to that, while that snapshot may be reset periodically in + * case of non-unique index. */ if (!isconcurrent) { @@ -1451,6 +1458,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); @@ -1511,7 +1523,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -1538,7 +1550,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->brokenhotchain = false; table_parallelscan_initialize(btspool->heap, ParallelTableScanFromBTShared(btshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1614,6 +1627,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Save leader state now that it's clear build will be parallel */ buildstate->btleader = btleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * Wait until all workers imported initial snapshot. + */ + if (reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _bt_leader_participate_as_worker(buildstate); @@ -1622,7 +1642,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!reset_snapshot) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); } @@ -1646,7 +1667,7 @@ _bt_end_parallel(BTLeader *btleader) InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) + if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot)) UnregisterSnapshot(btleader->snapshot); DestroyParallelContext(btleader->pcxt); ExitParallelMode(); @@ -1896,6 +1917,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, SortCoordinate coordinate; BTBuildState buildstate; TableScanDesc scan; + ParallelTableScanDesc pscan; double reltuples; IndexInfo *indexInfo; @@ -1950,11 +1972,15 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); indexInfo->ii_Concurrent = btshared->isconcurrent; - scan = table_beginscan_parallel(btspool->heap, - ParallelTableScanFromBTShared(btshared)); + pscan = ParallelTableScanFromBTShared(btshared); + scan = table_beginscan_parallel(btspool->heap, pscan); reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo, true, progress, _bt_build_callback, &buildstate, scan); + InvalidateCatalogSnapshot(); + if (pscan->phs_reset_snapshot) + PopActiveSnapshot(); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); /* Execute this worker's part of the sort */ if (progress) @@ -1990,4 +2016,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, tuplesort_end(btspool->sortstate); if (btspool2) tuplesort_end(btspool2->sortstate); + Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin)); + if (pscan->phs_reset_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); } diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 5e41404937e..8b33b6278ce 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -132,10 +132,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) { Size sz = 0; - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) sz = add_size(sz, EstimateSnapshotSpace(snapshot)); else - Assert(snapshot == SnapshotAny); + Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot); sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); @@ -144,21 +144,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot) + Snapshot snapshot, bool reset_snapshot) { Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); pscan->phs_snapshot_off = snapshot_off; - if (IsMVCCSnapshot(snapshot)) + /* + * Initialize parallel scan description. For normal scans with a regular + * MVCC snapshot, serialize the snapshot info. For scans that use periodic + * snapshot resets, mark the scan accordingly. + */ + if (reset_snapshot) + { + Assert(snapshot == InvalidSnapshot); + pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = true; + INJECTION_POINT("table_parallelscan_initialize", NULL); + } + else if (IsMVCCSnapshot(snapshot)) { SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = false; } else { Assert(snapshot == SnapshotAny); + Assert(!reset_snapshot); pscan->phs_snapshot_any = true; + pscan->phs_reset_snapshot = false; } } @@ -171,7 +186,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan) Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator)); - if (!pscan->phs_snapshot_any) + /* + * For scans that + * use periodic snapshot resets, mark the scan accordingly and use the active + * snapshot as the initial state. + */ + if (pscan->phs_reset_snapshot) + { + Assert(ActiveSnapshotSet()); + flags |= SO_RESET_SNAPSHOT; + /* Start with current active snapshot. */ + snapshot = GetActiveSnapshot(); + } + else if (!pscan->phs_snapshot_any) { /* Snapshot was serialized -- restore it */ snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 94db1ec3012..065ea9d26f6 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -77,6 +77,7 @@ #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D) #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E) #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F) +#define PARALLEL_KEY_SNAPSHOT_RESTORED UINT64CONST(0xFFFFFFFFFFFF0010) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -305,6 +306,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool), + pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate how much we'll need for the entrypoint info. */ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); @@ -376,6 +381,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *entrypointstate; char *uncommittedenumsspace; char *clientconninfospace; + bool *snapshot_set_flag_space; Size lnamelen; /* Serialize shared libraries we have loaded. */ @@ -491,6 +497,19 @@ InitializeParallelDSM(ParallelContext *pcxt) strcpy(entrypointstate, pcxt->library_name); strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); + + /* + * Establish dynamic shared memory to pass information about importing + * of snapshot. + */ + snapshot_set_flag_space = + shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers)); + for (i = 0; i < pcxt->nworkers; ++i) + { + pcxt->worker[i].snapshot_restored = snapshot_set_flag_space + i * sizeof(bool); + *pcxt->worker[i].snapshot_restored = false; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space); } /* Update nworkers_to_launch, in case we changed nworkers above. */ @@ -546,6 +565,17 @@ ReinitializeParallelDSM(ParallelContext *pcxt) pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); } } + + /* Set snapshot restored flag to false. */ + if (pcxt->nworkers > 0) + { + bool *snapshot_restored_space; + int i; + snapshot_restored_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + for (i = 0; i < pcxt->nworkers; ++i) + snapshot_restored_space[i] = false; + } } /* @@ -661,6 +691,10 @@ LaunchParallelWorkers(ParallelContext *pcxt) * Wait for all workers to attach to their error queues, and throw an error if * any worker fails to do this. * + * wait_for_snapshot: track whether each parallel worker has successfully restored + * its snapshot. This is needed when using periodic snapshot resets to ensure all + * workers have a valid initial snapshot before proceeding with the scan. + * * Callers can assume that if this function returns successfully, then the * number of workers given by pcxt->nworkers_launched have initialized and * attached to their error queues. Whether or not these workers are guaranteed @@ -690,7 +724,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) * call this function at all. */ void -WaitForParallelWorkersToAttach(ParallelContext *pcxt) +WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot) { int i; @@ -734,9 +768,12 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); if (shm_mq_get_sender(mq) != NULL) { - /* Yes, so it is known to be attached. */ - pcxt->known_attached_workers[i] = true; - ++pcxt->nknown_attached_workers; + if (!wait_for_snapshot || *(pcxt->worker[i].snapshot_restored)) + { + /* Yes, so it is known to be attached. */ + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } } } else if (status == BGWH_STOPPED) @@ -1295,6 +1332,7 @@ ParallelWorkerMain(Datum main_arg) shm_toc *toc; FixedParallelState *fps; char *error_queue_space; + bool *snapshot_restored_space; shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; @@ -1499,6 +1537,10 @@ ParallelWorkerMain(Datum main_arg) fps->parallel_leader_pgproc); PushActiveSnapshot(asnapshot); + /* Snapshot is restored, set flag to make leader know about it. */ + snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + snapshot_restored_space[ParallelWorkerNumber] = true; + /* * We've changed which tuples we can see, and must therefore invalidate * system caches. diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index edc07b72018..371863895dd 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1532,7 +1532,7 @@ index_concurrently_build(Oid heapRelationId, /* Invalidate catalog snapshot just for assert */ InvalidateCatalogSnapshot(); - Assert((indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique) || !TransactionIdIsValid(MyProc->xmin)); + Assert(indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin)); /* Roll back any GUC changes executed by index functions */ AtEOXact_GUC(false, save_nestlevel); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 94047d29430..f16284d4d0d 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -371,7 +371,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node, pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 8e1a918f130..68ea98405bb 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -353,14 +353,6 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { - /* - * We might be able to relax this, but nothing that could otherwise work - * needs it. - */ - if (IsInParallelMode()) - elog(ERROR, - "cannot update SecondarySnapshot during a parallel operation"); - /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index f37be6d5690..a7362f7b43b 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -26,6 +26,7 @@ typedef struct ParallelWorkerInfo { BackgroundWorkerHandle *bgwhandle; shm_mq_handle *error_mqh; + bool *snapshot_restored; } ParallelWorkerInfo; typedef struct ParallelContext @@ -65,7 +66,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch); extern void LaunchParallelWorkers(ParallelContext *pcxt); -extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index b5e0fb386c0..50441c58cea 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -82,6 +82,7 @@ typedef struct ParallelTableScanDescData RelFileLocator phs_locator; /* physical relation to scan */ bool phs_syncscan; /* report location to syncscan logic? */ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */ + bool phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */ Size phs_snapshot_off; /* data for snapshot */ } ParallelTableScanDescData; typedef struct ParallelTableScanDescData *ParallelTableScanDesc; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 71af14d1c31..613615c78cd 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1140,7 +1140,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot); */ extern void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot); + Snapshot snapshot, + bool reset_snapshot); /* * Begin a parallel scan. `pscan` needs to have been initialized with @@ -1762,9 +1763,9 @@ table_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, * This only really makes sense for heap AM, it might need to be generalized * for other AMs later. * - * In case of non-unique index and non-parallel concurrent build - * SO_RESET_SNAPSHOT is applied for the scan. That leads for changing snapshots - * on the fly to allow xmin horizon propagate. + * In case of non-unique concurrent index build SO_RESET_SNAPSHOT is applied + * for the scan. That leads for changing snapshots on the fly to allow xmin + * horizon propagate. */ static inline double table_index_build_scan(Relation table_rel, diff --git a/src/test/modules/injection_points/expected/cic_reset_snapshots.out b/src/test/modules/injection_points/expected/cic_reset_snapshots.out index 948d1232aa0..595a4000ce0 100644 --- a/src/test/modules/injection_points/expected/cic_reset_snapshots.out +++ b/src/test/modules/injection_points/expected/cic_reset_snapshots.out @@ -17,6 +17,12 @@ SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice' (1 row) +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); + injection_points_attach +------------------------- + +(1 row) + CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); INSERT INTO cic_reset_snap.tbl SELECT i, i * I FROM generate_series(1, 200) s(i); @@ -72,30 +78,45 @@ NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + injection_points_detach +------------------------- + +(1 row) + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(MOD(i, 2), j) WHERE MOD(i, 2) = 0; +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable(i); NOTICE: notice triggered for injection point table_beginscan_strat_reset_snapshots -NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; NOTICE: notice triggered for injection point table_beginscan_strat_reset_snapshots -NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable_no_param(); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i DESC NULLS LAST); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl USING BRIN(i); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; DROP SCHEMA cic_reset_snap CASCADE; NOTICE: drop cascades to 3 other objects diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql index 5072535b355..2941aa7ae38 100644 --- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql +++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql @@ -3,7 +3,7 @@ CREATE EXTENSION injection_points; SELECT injection_points_set_local(); SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice'); SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'); - +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); @@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; @@ -83,4 +86,4 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; DROP SCHEMA cic_reset_snap CASCADE; -DROP EXTENSION injection_points; +DROP EXTENSION injection_points; \ No newline at end of file -- 2.43.0