From 3e5fabc6e5a30bb16ec11fdc7fcf65880e172d0c Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Tue, 7 Nov 2023 17:04:28 +0100 Subject: [PATCH v3 1/4] parallel CREATE INDEX for BRIN v2 --- src/backend/access/brin/brin.c | 781 ++++++++++++++++++++- src/backend/access/nbtree/nbtsort.c | 2 +- src/backend/access/table/tableam.c | 49 +- src/backend/access/transam/parallel.c | 4 + src/backend/catalog/index.c | 3 +- src/backend/executor/nodeSeqscan.c | 3 +- src/backend/utils/sort/tuplesort.c | 3 + src/backend/utils/sort/tuplesortvariants.c | 211 ++++++ src/include/access/brin.h | 3 + src/include/access/relscan.h | 1 + src/include/access/tableam.h | 9 +- src/include/utils/tuplesort.h | 11 + 12 files changed, 1065 insertions(+), 15 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 25338a90e29..b7cd29c5968 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -33,6 +33,7 @@ #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "storage/freespace.h" +#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" @@ -40,7 +41,118 @@ #include "utils/index_selfuncs.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/tuplesort.h" +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xA000000000000001) +#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000003) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000004) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000005) + +/* + * Status record for spooling/sorting phase. + */ +typedef struct BrinSpool +{ + Tuplesortstate *sortstate; /* state data for tuplesort.c */ + Relation heap; + Relation index; +} BrinSpool; + +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. + */ +typedef struct BrinShared +{ + /* + * These fields are not modified during the build. They primarily exist for + * the benefit of worker processes that need to create state corresponding + * to that used by the leader. + */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + BlockNumber pagesPerRange; + int scantuplesortstates; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can use + * results built by the workers (and before leader can write the data into + * the index). + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all fields before heapdesc. + * + * These fields contain status information of interest to BRIN index + * builds that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of the scans. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * ParallelTableScanDescData data follows. Can't directly embed here, as + * implementations of the parallel table scan desc interface might need + * stronger alignment. + */ +} BrinShared; + +/* + * Return pointer to a BrinShared's parallel table scan. + * + * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just + * MAXALIGN. + */ +#define ParallelTableScanFromBrinShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared))) + +/* + * Status for leader in parallel index build. + */ +typedef struct BrinLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipanttuplesorts is the exact number of worker processes + * successfully launched, plus one leader process if it participates as a + * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader + * participating as a worker). + */ + int nparticipanttuplesorts; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * brinshared is the shared state for entire build. sharedsort is the + * shared, tuplesort-managed state passed to each process tuplesort. + * snapshot is the snapshot used by the scan iff an MVCC snapshot is required. + */ + BrinShared *brinshared; + Sharedsort *sharedsort; + Snapshot snapshot; + WalUsage *walusage; + BufferUsage *bufferusage; +} BrinLeader; /* * We use a BrinBuildState during initial construction of a BRIN index. @@ -50,12 +162,23 @@ typedef struct BrinBuildState { Relation bs_irel; int bs_numtuples; + int bs_reltuples; Buffer bs_currentInsertBuf; BlockNumber bs_pagesPerRange; BlockNumber bs_currRangeStart; BrinRevmap *bs_rmAccess; BrinDesc *bs_bdesc; BrinMemTuple *bs_dtuple; + + /* + * bs_leader is only present when a parallel index build is performed, and + * only in the leader process. (Actually, only the leader has a + * BrinBuildState.) + */ + BrinLeader *bs_leader; + int bs_worker_id; + BrinSpool *bs_spool; + BrinSpool *bs_spool_out; } BrinBuildState; /* @@ -76,6 +199,7 @@ static void terminate_brin_buildstate(BrinBuildState *state); static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange, bool include_partial, double *numSummarized, double *numExisting); static void form_and_insert_tuple(BrinBuildState *state); +static void form_and_spill_tuple(BrinBuildState *state); static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b); static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy); @@ -83,6 +207,20 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup, const Datum *values, const bool *nulls); static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys); +/* parallel index builds */ +static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request); +static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state); +static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, + Relation heap, Relation index); +static void _brin_parallel_scan_and_build(BrinBuildState *buildstate, + BrinSpool *brinspool, + BrinShared *brinshared, + Sharedsort *sharedsort, + Relation heap, Relation index, + int sortmem, bool progress); + /* * BRIN handler function: return IndexAmRoutine with access method parameters * and callbacks. @@ -820,6 +958,67 @@ brinbuildCallback(Relation index, values, isnull); } +/* + * A version of the callback, used by parallel index builds. The main difference + * is that instead of writing the BRIN tuples into the index, we write them into + * a shared tuplestore file, and leave the insertion up to the leader (which may + * reorder them a bit etc.). The callback also does not generate empty ranges, + * those may be added by the leader when merging results from workers. + */ +static void +brinbuildCallbackParallel(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate) +{ + BrinBuildState *state = (BrinBuildState *) brstate; + BlockNumber thisblock; + + thisblock = ItemPointerGetBlockNumber(tid); + + /* + * If we're in a block that belongs to a future range, summarize what + * we've got and start afresh. Note the scan might have skipped many + * pages, if they were devoid of live tuples; make sure to insert index + * tuples for those too. + */ + while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1) + { + + BRIN_elog((DEBUG2, + "brinbuildCallback: completed a range: %u--%u", + state->bs_currRangeStart, + state->bs_currRangeStart + state->bs_pagesPerRange)); + + /* create the index tuple and write it into the tuplesort */ + form_and_spill_tuple(state); + + /* + * set state to correspond to the next range + * + * XXX This has the issue that it skips ranges summarized by other + * workers, but it also skips empty ranges that should have been + * summarized. We'd need to either make the workers aware which + * chunk they are actually processing (which is currently known + * only in the ParallelBlockTableScan bit). Or we could ignore it + * here, and then decide it while "merging" results from workers + * (if there's no entry for the range, it had to be empty so we + * just add an empty one). + */ + while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1) + state->bs_currRangeStart += state->bs_pagesPerRange; + + /* re-initialize state for it */ + brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc); + } + + /* Accumulate the current tuple into the running state */ + (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple, + values, isnull); +} + /* * brinbuild() -- build a new BRIN index. */ @@ -881,18 +1080,93 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) revmap = brinRevmapInitialize(index, &pagesPerRange); state = initialize_brin_buildstate(index, revmap, pagesPerRange); + state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + state->bs_spool->heap = heap; + state->bs_spool->index = index; + + /* + * Attempt to launch parallel worker scan when required + * + * XXX plan_create_index_workers makes the number of workers dependent on + * maintenance_work_mem, requiring 32MB for each worker. That makes sense + * for btree, but not for BRIN, which can do away with much less memory. + * So maybe make that somehow less strict, optionally? + */ + if (indexInfo->ii_ParallelWorkers > 0) + _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); + /* * Now scan the relation. No syncscan allowed here because we want the * heap blocks in physical order. */ - reltuples = table_index_build_scan(heap, index, indexInfo, false, true, - brinbuildCallback, (void *) state, NULL); - /* process the final batch */ - form_and_insert_tuple(state); + /* + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state + */ + if (state->bs_leader) + { + SortCoordinate coordinate; + + coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = false; + coordinate->nParticipants = + state->bs_leader->nparticipanttuplesorts; + coordinate->sharedsort = state->bs_leader->sharedsort; + + + /* + * Begin serial/leader tuplesort. + * + * In cases where parallelism is involved, the leader receives the same + * share of maintenance_work_mem as a serial sort (it is generally treated + * in the same way as a serial sort once we return). Parallel worker + * Tuplesortstates will have received only a fraction of + * maintenance_work_mem, though. + * + * We rely on the lifetime of the Leader Tuplesortstate almost not + * overlapping with any worker Tuplesortstate's lifetime. There may be + * some small overlap, but that's okay because we rely on leader + * Tuplesortstate only allocating a small, fixed amount of memory here. + * When its tuplesort_performsort() is called (by our caller), and + * significant amounts of memory are likely to be used, all workers must + * have already freed almost all memory held by their Tuplesortstates + * (they are about to go away completely, too). The overall effect is + * that maintenance_work_mem always represents an absolute high watermark + * on the amount of memory used by a CREATE INDEX operation, regardless of + * the use of parallelism or any other factor. + */ + state->bs_spool_out = (BrinSpool *) palloc0(sizeof(BrinSpool)); + state->bs_spool_out->heap = heap; + state->bs_spool_out->index = index; + + state->bs_spool_out->sortstate = + tuplesort_begin_index_brin(heap, index, + maintenance_work_mem, coordinate, + TUPLESORT_NONE); + + /* + * In parallel mode, wait for workers to complete, and then read all + * tuples from the shared tuplesort and insert them into the index. + */ + _brin_end_parallel(state->bs_leader, state); + } + else /* no parallel index build, just do the usual thing */ + { + reltuples = table_index_build_scan(heap, index, indexInfo, false, true, + brinbuildCallback, (void *) state, NULL); + + /* process the final batch */ + form_and_insert_tuple(state); + + /* track the number of relation tuples */ + state->bs_reltuples = reltuples; + } /* release resources */ idxtuples = state->bs_numtuples; + reltuples = state->bs_reltuples; brinRevmapTerminate(state->bs_rmAccess); terminate_brin_buildstate(state); @@ -1312,12 +1586,16 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap, state->bs_irel = idxRel; state->bs_numtuples = 0; + state->bs_reltuples = 0; state->bs_currentInsertBuf = InvalidBuffer; state->bs_pagesPerRange = pagesPerRange; state->bs_currRangeStart = 0; state->bs_rmAccess = revmap; state->bs_bdesc = brin_build_desc(idxRel); state->bs_dtuple = brin_new_memtuple(state->bs_bdesc); + state->bs_leader = NULL; + state->bs_worker_id = 0; + state->bs_spool = NULL; return state; } @@ -1609,6 +1887,32 @@ form_and_insert_tuple(BrinBuildState *state) pfree(tup); } +/* + * Given a deformed tuple in the build state, convert it into the on-disk + * format and write it to a (shared) tuplestore (the leader will insert it + * into the index later). + */ +static void +form_and_spill_tuple(BrinBuildState *state) +{ + BrinTuple *tup; + Size size; + + /* don't insert empty tuples in parallel build */ + if (state->bs_dtuple->bt_empty_range) + return; + + tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart, + state->bs_dtuple, &size); + + /* write the BRIN tuple to the tuplesort */ + tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size); + + state->bs_numtuples++; + + pfree(tup); +} + /* * Given two deformed tuples, adjust the first one so that it's consistent * with the summary values in both. @@ -1928,3 +2232,472 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys) return true; } + +static void +_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int scantuplesortstates; + Snapshot snapshot; + Size estbrinshared; + Size estsort; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader)); + WalUsage *walusage; + BufferUsage *bufferusage; + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of brin + * index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", + request); + + scantuplesortstates = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. + */ + estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); + estsort = tuplesort_estimate_shared(scantuplesortstates); + shm_toc_estimate_chunk(&pcxt->estimator, estsort); + + shm_toc_estimate_keys(&pcxt->estimator, 2); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE + * and PARALLEL_KEY_BUFFER_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgWalUsage or + * pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); + /* Initialize immutable state */ + brinshared->heaprelid = RelationGetRelid(heap); + brinshared->indexrelid = RelationGetRelid(index); + brinshared->isconcurrent = isconcurrent; + brinshared->scantuplesortstates = scantuplesortstates; + brinshared->pagesPerRange = buildstate->bs_pagesPerRange; + ConditionVariableInit(&brinshared->workersdonecv); + SpinLockInit(&brinshared->mutex); + + /* Initialize mutable state */ + brinshared->nparticipantsdone = 0; + brinshared->reltuples = 0.0; + brinshared->indtuples = 0.0; + + table_parallelscan_initialize(heap, + ParallelTableScanFromBrinShared(brinshared), + snapshot, brinshared->pagesPerRange); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort); + tuplesort_initialize_shared(sharedsort, scantuplesortstates, + pcxt->seg); + + /* + * Store shared tuplesort-private state, for which we reserved space. + * Then, initialize opaque state using tuplesort routine. + */ + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + brinleader->pcxt = pcxt; + brinleader->nparticipanttuplesorts = pcxt->nworkers_launched; + if (leaderparticipates) + brinleader->nparticipanttuplesorts++; + brinleader->brinshared = brinshared; + brinleader->sharedsort = sharedsort; + brinleader->snapshot = snapshot; + brinleader->walusage = walusage; + brinleader->bufferusage = bufferusage; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _brin_end_parallel(brinleader, NULL); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->bs_leader = brinleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _brin_leader_participate_as_worker(buildstate, heap, index); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) +{ + int i; + BrinTuple *btup; + Size tuplen; + BrinShared *brinshared = brinleader->brinshared; + BlockNumber prevblkno = InvalidBlockNumber; + BrinTuple *emptyTuple = NULL; + Size emptySize; + + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(brinleader->pcxt); + + if (!state) + return; + + /* copy the data into leader state (we have to wait for the workers ) */ + state->bs_reltuples = brinshared->reltuples; + state->bs_numtuples = brinshared->indtuples; + + tuplesort_performsort(state->bs_spool_out->sortstate); + + /* + * Read the BRIN tuples from the shared tuplesort, sorted by block number. + * That probably gives us an index that is cheaper to scan, thanks to mostly + * getting data from the same index page as before. + * + * FIXME This probably needs some memory management fixes - we're reading + * tuples from the tuplesort, we're allocating an emty tuple, and so on. + * Probably better to release this memory. + * + * XXX We can't quite free the BrinTuple, though, because that's a field + * in BrinSortTuple. + */ + while ((btup = tuplesort_getbrintuple(state->bs_spool_out->sortstate, &tuplen, true)) != NULL) + { + /* + * We should not get two summaries for the same range. The workers + * are producing ranges for non-overlapping sections of the table. + */ + Assert(btup->bt_blkno != prevblkno); + + /* Ranges should be multiples of pages_per_range for the index. */ + Assert(btup->bt_blkno % brinshared->pagesPerRange == 0); + + /* Fill empty ranges for all ranges missing in the tuplesort. */ + prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno; + while (prevblkno + state->bs_pagesPerRange < btup->bt_blkno) + { + /* the missing range */ + prevblkno += state->bs_pagesPerRange; + + /* Did we already build the empty range? If not, do it now. */ + if (emptyTuple == NULL) + { + BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc); + + emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize); + } + else + { + /* we already have am "empty range" tuple, just set the block */ + emptyTuple->bt_blkno = prevblkno; + } + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, + emptyTuple->bt_blkno, emptyTuple, emptySize); + } + + brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess, + &state->bs_currentInsertBuf, btup->bt_blkno, btup, tuplen); + + prevblkno = btup->bt_blkno; + } + + tuplesort_end(state->bs_spool_out->sortstate); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) + */ + for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(brinleader->snapshot)) + UnregisterSnapshot(brinleader->snapshot); + DestroyParallelContext(brinleader->pcxt); + ExitParallelMode(); +} + +/* + * Returns size of shared memory required to store state for a parallel + * brin index build based on the snapshot its parallel scan will use. + */ +static Size +_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(sizeof(BrinShared)), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index) +{ + BrinLeader *brinleader = buildstate->bs_leader; + int sortmem; + + /* Allocate memory and initialize private spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = buildstate->bs_spool->heap; + buildstate->bs_spool->index = buildstate->bs_spool->index; + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts; + + /* Perform work common to all participants */ + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared, + brinleader->sharedsort, heap, index, sortmem, true); +} + +/* + * Perform a worker's portion of a parallel sort. + * + * This generates a tuplesort for passed btspool, and a second tuplesort + * state if a second btspool is need (i.e. for unique index builds). All + * other spool fields should already be set when this is called. + * + * sortmem is the amount of working memory to use within each worker, + * expressed in KBs. + * + * When this returns, workers are done, and need only release resources. + */ +static void +_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool, + BrinShared *brinshared, Sharedsort *sharedsort, + Relation heap, Relation index, int sortmem, + bool progress) +{ + SortCoordinate coordinate; + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Initialize local tuplesort coordination state */ + coordinate = palloc0(sizeof(SortCoordinateData)); + coordinate->isWorker = true; + coordinate->nParticipants = -1; + coordinate->sharedsort = sharedsort; + + /* Begin "partial" tuplesort */ + brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap, + brinspool->index, + sortmem, coordinate, + TUPLESORT_NONE); + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(index); + indexInfo->ii_Concurrent = brinshared->isconcurrent; + + scan = table_beginscan_parallel(heap, + ParallelTableScanFromBrinShared(brinshared)); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + brinbuildCallbackParallel, state, scan); + + /* insert the last item */ + form_and_spill_tuple(state); + + /* sort the BRIN ranges built by this worker */ + tuplesort_performsort(brinspool->sortstate); + + state->bs_reltuples += reltuples; + + /* + * Done. Record ambuild statistics. + */ + SpinLockAcquire(&brinshared->mutex); + brinshared->nparticipantsdone++; + brinshared->reltuples += state->bs_reltuples; + brinshared->indtuples += state->bs_numtuples; + SpinLockRelease(&brinshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&brinshared->workersdonecv); + + tuplesort_end(brinspool->sortstate); +} + +/* + * Perform work within a launched parallel process. + */ +void +_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + BrinShared *brinshared; + Sharedsort *sharedsort; + BrinBuildState *buildstate; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + WalUsage *walusage; + BufferUsage *bufferusage; + int sortmem; + + /* + * The only possible status flag that can be set to the parallel worker is + * PROC_IN_SAFE_IC. + */ + Assert((MyProc->statusFlags == 0) || + (MyProc->statusFlags == PROC_IN_SAFE_IC)); + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up brin shared state */ + brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!brinshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = table_open(brinshared->heaprelid, heapLockmode); + indexRel = index_open(brinshared->indexrelid, indexLockmode); + + buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange); + + /* Initialize worker's own spool */ + buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool)); + buildstate->bs_spool->heap = heapRel; + buildstate->bs_spool->index = indexRel; + + /* Look up shared state private to tuplesort.c */ + sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false); + tuplesort_attach_shared(sharedsort, seg); + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + + _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, + brinshared, sharedsort, + heapRel, indexRel, sortmem, false); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index c2665fce411..6241baeea86 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->brokenhotchain = false; table_parallelscan_initialize(btspool->heap, ParallelTableScanFromBTShared(btshared), - snapshot); + snapshot, InvalidBlockNumber); /* * Store shared tuplesort-private state, for which we reserved space. diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index c6bdb7e1c68..4af0d433e9d 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot) + Snapshot snapshot, BlockNumber chunk_factor) { - Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); + Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor); pscan->phs_snapshot_off = snapshot_off; @@ -395,16 +395,21 @@ table_block_parallelscan_estimate(Relation rel) } Size -table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) +table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor) { ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; bpscan->base.phs_relid = RelationGetRelid(rel); bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel); - /* compare phs_syncscan initialization to similar logic in initscan */ + bpscan->phs_chunk_factor = chunk_factor; + /* compare phs_syncscan initialization to similar logic in initscan + * + * Disable sync scans if the chunk factor is set (valid block number). + */ bpscan->base.phs_syncscan = synchronize_seqscans && !RelationUsesLocalBuffers(rel) && - bpscan->phs_nblocks > NBuffers / 4; + (bpscan->phs_nblocks > NBuffers / 4) && + !BlockNumberIsValid(bpscan->phs_chunk_factor); SpinLockInit(&bpscan->phs_mutex); bpscan->phs_startblock = InvalidBlockNumber; pg_atomic_init_u64(&bpscan->phs_nallocated, 0); @@ -459,6 +464,25 @@ table_block_parallelscan_startblock_init(Relation rel, pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size, PARALLEL_SEQSCAN_MAX_CHUNK_SIZE); + /* + * If the chunk size factor is set, we need to make sure the chunk size is + * a multiple of that value. We round the chunk size to the nearest chunk + * factor multiple, at least one chunk_factor. + * + * XXX Note this may override PARALLEL_SEQSCAN_MAX_CHUNK_SIZE, in case the + * chunk factor (e.g. BRIN pages_per_range) is larger. + */ + if (pbscan->phs_chunk_factor != InvalidBlockNumber) + { + /* nearest (smaller) multiple of chunk_factor */ + pbscanwork->phsw_chunk_size + = pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor); + + /* but at least one chunk_factor */ + pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size, + pbscan->phs_chunk_factor); + } + retry: /* Grab the spinlock. */ SpinLockAcquire(&pbscan->phs_mutex); @@ -575,6 +599,21 @@ table_block_parallelscan_nextpage(Relation rel, (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS)) pbscanwork->phsw_chunk_size >>= 1; + /* + * We need to make sure the new chunk_size is still a suitable multiple + * of chunk_factor. + */ + if (pbscan->phs_chunk_factor != InvalidBlockNumber) + { + /* nearest (smaller) multiple of chunk_factor */ + pbscanwork->phsw_chunk_size + = pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor); + + /* but at least one chunk_factor */ + pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size, + pbscan->phs_chunk_factor); + } + nallocated = pbscanwork->phsw_nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, pbscanwork->phsw_chunk_size); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 194a1207be6..d78314062e0 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/brin.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -145,6 +146,9 @@ static const struct { "_bt_parallel_build_main", _bt_parallel_build_main }, + { + "_brin_parallel_build_main", _brin_parallel_build_main + }, { "parallel_vacuum_main", parallel_vacuum_main } diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 143fae01ebd..37e4305d50a 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2982,7 +2982,8 @@ index_build(Relation heapRelation, * Note that planner considers parallel safety for us. */ if (parallel && IsNormalProcessingMode() && - indexRelation->rd_rel->relam == BTREE_AM_OID) + (indexRelation->rd_rel->relam == BTREE_AM_OID || + indexRelation->rd_rel->relam == BRIN_AM_OID)) indexInfo->ii_ParallelWorkers = plan_create_index_workers(RelationGetRelid(heapRelation), RelationGetRelid(indexRelation)); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 49a5933aff6..529a7ed3284 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -262,7 +262,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node, pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + InvalidBlockNumber); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index ab6353bdcd1..4c6b396a8a8 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -1328,6 +1328,7 @@ tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbre break; default: + Assert(false); elog(ERROR, "invalid tuplesort state"); break; } @@ -1462,6 +1463,7 @@ tuplesort_performsort(Tuplesortstate *state) break; default: + Assert(false); elog(ERROR, "invalid tuplesort state"); break; } @@ -1718,6 +1720,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, return false; default: + Assert(false); elog(ERROR, "invalid tuplesort state"); return false; /* keep compiler quiet */ } diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c index 2cd508e5130..343ed4bbc54 100644 --- a/src/backend/utils/sort/tuplesortvariants.c +++ b/src/backend/utils/sort/tuplesortvariants.c @@ -19,6 +19,7 @@ #include "postgres.h" +#include "access/brin_tuple.h" #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" @@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups, int count); static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups, int count); +static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, + int count); static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups, int count); static int comparetup_heap(const SortTuple *a, const SortTuple *b, @@ -69,10 +72,16 @@ static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_index_brin(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len); +static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b, @@ -128,6 +137,16 @@ typedef struct uint32 max_buckets; } TuplesortIndexHashArg; +/* + * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase. + */ +typedef struct +{ + TuplesortIndexArg index; + + /* XXX do we need something here? */ +} TuplesortIndexBrinArg; + /* * Data struture pointed by "TuplesortPublic.arg" for the Datum case. * Set by tuplesort_begin_datum and used only by the DatumTuple routines. @@ -140,6 +159,22 @@ typedef struct int datumTypeLen; } TuplesortDatumArg; +/* + * Computing BrinTuple size with only the tuple is difficult, so we want to track + * the length for r referenced by SortTuple. That's what BrinSortTuple is meant + * to do - it's essentially a BrinTuple prefixed by length. We only write the + * BrinTuple to the logtapes, though. + */ +typedef struct BrinSortTuple +{ + Size tuplen; + BrinTuple tuple; +} BrinSortTuple; + +/* Size of the BrinSortTuple, given length of the BrinTuple. */ +#define BRINSORTTUPLE_SIZE(len) (offsetof(BrinSortTuple, tuple) + (len)) + + Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, @@ -527,6 +562,47 @@ tuplesort_begin_index_gist(Relation heapRel, return state; } +Tuplesortstate * +tuplesort_begin_index_brin(Relation heapRel, + Relation indexRel, + int workMem, + SortCoordinate coordinate, + int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext; + TuplesortIndexBrinArg *arg; + + oldcontext = MemoryContextSwitchTo(base->maincontext); + arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg)); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin index sort: workMem = %d, randomAccess = %c", + workMem, + sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + base->nKeys = 1; /* Only one sort column, the block number */ + + base->removeabbrev = removeabbrev_index_brin; + base->comparetup = comparetup_index_brin; + base->writetup = writetup_index_brin; + base->readtup = readtup_index_brin; + base->haveDatum1 = true; + base->arg = arg; + + arg->index.heapRel = heapRel; + arg->index.indexRel = indexRel; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + Tuplesortstate * tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, int workMem, @@ -707,6 +783,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, !stup.isnull1); } +/* + * Collect one BRIN tuple while collecting input data for sort. + */ +void +tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size) +{ + SortTuple stup; + BrinSortTuple *bstup; + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext); + + /* allocate space for the whole BRIN sort tuple */ + bstup = palloc(BRINSORTTUPLE_SIZE(size)); + stup.tuple = bstup; + + bstup->tuplen = size; + memcpy(&bstup->tuple, tuple, size); + + stup.datum1 = tuple->bt_blkno; + stup.isnull1 = false; + + tuplesort_puttuple_common(state, &stup, + base->sortKeys && + base->sortKeys->abbrev_converter && + !stup.isnull1); + + MemoryContextSwitchTo(oldcontext); +} + /* * Accept one Datum while collecting input data for sort. * @@ -850,6 +955,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward) return (IndexTuple) stup.tuple; } +/* + * Fetch the next BRIN tuple in either forward or back direction. + * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory + * context, and must not be freed by caller. Caller may not rely on tuple + * remaining valid after any further manipulation of tuplesort. + */ +BrinTuple * +tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext); + SortTuple stup; + BrinSortTuple *btup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + stup.tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + if (!stup.tuple) + return NULL; + + btup = (BrinSortTuple *) stup.tuple; + + *len = btup->tuplen; + + return &btup->tuple; +} + /* * Fetch the next Datum in either forward or back direction. * Returns false if no more datums. @@ -1564,6 +1698,83 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, &stup->isnull1); } +/* + * Routines specialized for BrinTuple case + */ + +static void +removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count) +{ + int i; + + for (i = 0; i < count; i++) + { + BrinSortTuple *tuple; + + tuple = stups[i].tuple; + stups[i].datum1 = tuple->tuple.bt_blkno; + } +} + +static int +comparetup_index_brin(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + BrinTuple *tuple1; + BrinTuple *tuple2; + + tuple1 = &((BrinSortTuple *) a)->tuple; + tuple2 = &((BrinSortTuple *) b)->tuple; + + if (tuple1->bt_blkno > tuple2->bt_blkno) + return 1; + else if (tuple1->bt_blkno < tuple2->bt_blkno) + return -1; + + /* silence compilers */ + return 0; +} + +static void +writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + BrinSortTuple *tuple = (BrinSortTuple *) stup->tuple; + unsigned int tuplen = tuple->tuplen; + + tuplen = tuplen + sizeof(tuplen); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen); + if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); +} + +static void +readtup_index_brin(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + unsigned int tuplen = len - sizeof(unsigned int); + + /* + * Allocate space for the BRIN sort tuple, which is BrinTuple with an + * extra length field. + */ + BrinSortTuple *tuple + = (BrinSortTuple *) tuplesort_readtup_alloc(state, + BRINSORTTUPLE_SIZE(tuplen)); + + tuple->tuplen = tuplen; + + LogicalTapeReadExact(tape, &tuple->tuple, tuplen); + if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); + stup->tuple = (void *) tuple; + + /* set up first-column key value, which is block number */ + stup->datum1 = tuple->tuple.bt_blkno; +} + /* * Routines specialized for DatumTuple case */ diff --git a/src/include/access/brin.h b/src/include/access/brin.h index ed66f1b3d51..3451ecb211f 100644 --- a/src/include/access/brin.h +++ b/src/include/access/brin.h @@ -11,6 +11,7 @@ #define BRIN_H #include "nodes/execnodes.h" +#include "storage/shm_toc.h" #include "utils/relcache.h" @@ -52,4 +53,6 @@ typedef struct BrinStatsData extern void brinGetStats(Relation index, BrinStatsData *stats); +extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc); + #endif /* BRIN_H */ diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index d03360eac04..72a20d882f5 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -79,6 +79,7 @@ typedef struct ParallelBlockTableScanDescData BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ slock_t phs_mutex; /* mutual exclusion for setting startblock */ BlockNumber phs_startblock; /* starting block number */ + BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */ pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to * workers so far. */ } ParallelBlockTableScanDescData; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index dbb709b56ce..d94e4d32aa1 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -390,7 +390,8 @@ typedef struct TableAmRoutine * relation. */ Size (*parallelscan_initialize) (Relation rel, - ParallelTableScanDesc pscan); + ParallelTableScanDesc pscan, + BlockNumber chunk_factor); /* * Reinitialize `pscan` for a new scan. `rel` will be the same relation as @@ -1148,7 +1149,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot); */ extern void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot); + Snapshot snapshot, + BlockNumber chunk_factor); /* * Begin a parallel scan. `pscan` needs to have been initialized with @@ -2064,7 +2066,8 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid, extern Size table_block_parallelscan_estimate(Relation rel); extern Size table_block_parallelscan_initialize(Relation rel, - ParallelTableScanDesc pscan); + ParallelTableScanDesc pscan, + BlockNumber chunk_factor); extern void table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan); extern BlockNumber table_block_parallelscan_nextpage(Relation rel, diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 9ed2de76cd6..357eb35311d 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -21,6 +21,7 @@ #ifndef TUPLESORT_H #define TUPLESORT_H +#include "access/brin_tuple.h" #include "access/itup.h" #include "executor/tuptable.h" #include "storage/dsm.h" @@ -282,6 +283,9 @@ typedef struct * The "index_hash" API is similar to index_btree, but the tuples are * actually sorted by their hash codes not the raw data. * + * The "index_brin" API is similar to index_btree, but the tuples are + * BrinTuple and are sorted by their block number not the raw data. + * * Parallel sort callers are required to coordinate multiple tuplesort states * in a leader process and one or more worker processes. The leader process * must launch workers, and have each perform an independent "partial" @@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel, Relation indexRel, int workMem, SortCoordinate coordinate, int sortopt); +extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel, + Relation indexRel, + int workMem, SortCoordinate coordinate, + int sortopt); extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, bool nullsFirstFlag, @@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup); extern void tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ItemPointer self, const Datum *values, const bool *isnull); +extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len); extern void tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull); @@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev); extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward); extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward); +extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len, + bool forward); extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy, Datum *val, bool *isNull, Datum *abbrev); -- 2.41.0