From 1c26dd85a987b99c7fca71fadf90b8bccccd06ec Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Tue, 9 Sep 2025 02:24:49 -0700 Subject: [PATCH v5 4/6] Use Instrumentation stack for parallel query aggregation in workers --- src/backend/access/brin/brin.c | 6 ++++-- src/backend/access/gin/gininsert.c | 6 ++++-- src/backend/access/nbtree/nbtsort.c | 6 ++++-- src/backend/commands/vacuumparallel.c | 6 ++++-- src/backend/executor/execParallel.c | 6 ++++-- src/backend/executor/instrument.c | 19 +++++++++---------- src/include/executor/instrument.h | 4 ++-- 7 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 6887e421442..c1c3d03b6ed 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -2885,6 +2885,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; + Instrumentation *instr; WalUsage *walusage; BufferUsage *bufferusage; int sortmem; @@ -2934,7 +2935,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) tuplesort_attach_shared(sharedsort, seg); /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* * Might as well use reliable figure when doling out maintenance_work_mem @@ -2949,7 +2950,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &bufferusage[ParallelWorkerNumber], &walusage[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 0d63fb4ba27..9149d735d59 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -2108,6 +2108,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; + Instrumentation *instr; WalUsage *walusage; BufferUsage *bufferusage; int sortmem; @@ -2176,7 +2177,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) tuplesort_attach_shared(sharedsort, seg); /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* * Might as well use reliable figure when doling out maintenance_work_mem @@ -2191,7 +2192,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &bufferusage[ParallelWorkerNumber], &walusage[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 90ab4e91b56..44d4bbf0b59 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1750,6 +1750,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; + Instrumentation *instr; WalUsage *walusage; BufferUsage *bufferusage; int sortmem; @@ -1825,7 +1826,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) } /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* Perform sorting of spool, and possibly a spool2 */ sortmem = maintenance_work_mem / btshared->scantuplesortstates; @@ -1835,7 +1836,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &bufferusage[ParallelWorkerNumber], &walusage[ParallelWorkerNumber]); #ifdef BTREE_BUILD_STATS diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index c3b3c9ea21a..10ba717bb6b 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -994,6 +994,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) PVIndStats *indstats; PVShared *shared; TidStore *dead_items; + Instrumentation *instr; BufferUsage *buffer_usage; WalUsage *wal_usage; int nindexes; @@ -1083,7 +1084,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) error_context_stack = &errcallback; /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* Process indexes to perform vacuum/cleanup */ parallel_vacuum_process_safe_indexes(&pvs); @@ -1091,7 +1092,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); /* Report any remaining cost-based vacuum delay time */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 43286d27663..bc4baa07347 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -1455,6 +1455,7 @@ void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; + Instrumentation *instr; BufferUsage *buffer_usage; WalUsage *wal_usage; DestReceiver *receiver; @@ -1515,7 +1516,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) * leader, which also doesn't count buffer accesses and WAL activity that * occur during executor startup. */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* * Run the plan. If we specified a tuple bound, be careful not to demand @@ -1531,7 +1532,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 8fca1ddbe64..9131b24f643 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -20,10 +20,8 @@ #include "utils/resowner.h" BufferUsage pgBufferUsage; -static BufferUsage save_pgBufferUsage; WalUsage pgWalUsage; InstrStack *pgInstrStack = NULL; -static WalUsage save_pgWalUsage; static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); static void WalUsageAdd(WalUsage *dst, WalUsage *add); @@ -380,21 +378,22 @@ InstrAggNode(NodeInstrumentation *dst, NodeInstrumentation *add) } /* start instrumentation during parallel executor startup */ -void +Instrumentation * InstrStartParallelQuery(void) { - save_pgBufferUsage = pgBufferUsage; - save_pgWalUsage = pgWalUsage; + Instrumentation *instr = InstrAlloc(INSTRUMENT_BUFFERS | INSTRUMENT_WAL); + + InstrStart(instr); + return instr; } /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrEndParallelQuery(Instrumentation *instr, BufferUsage *bufusage, WalUsage *walusage) { - memset(bufusage, 0, sizeof(BufferUsage)); - BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); - memset(walusage, 0, sizeof(WalUsage)); - WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); + InstrStop(instr, true); + memcpy(bufusage, &instr->stack->bufusage, sizeof(BufferUsage)); + memcpy(walusage, &instr->stack->walusage, sizeof(WalUsage)); } /* accumulate work done by workers in leader's stats */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 0bf1ef548d6..998d151c56a 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -186,8 +186,8 @@ extern void InstrUpdateTupleCount(NodeInstrumentation *instr, double nTuples); extern void InstrEndLoop(NodeInstrumentation *instr); extern void InstrAggNode(NodeInstrumentation *dst, NodeInstrumentation *add); -extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +pg_nodiscard extern Instrumentation *InstrStartParallelQuery(void); +extern void InstrEndParallelQuery(Instrumentation *instr, BufferUsage *bufusage, WalUsage *walusage); extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); -- 2.47.1