From 7e2f610e53cfe097dbc041e09915b8fc5da98c4e Mon Sep 17 00:00:00 2001 From: Nazir Bilal Yavuz Date: Tue, 5 Nov 2024 11:40:14 +0300 Subject: [PATCH v6 1/2] Optimize autoprewarm with read streams We've measured 10% performance improvement, and this arranges to benefit automatically from future optimizations to the read_stream subsystem. --- contrib/pg_prewarm/autoprewarm.c | 117 +++++++++++++++++++++++++++++-- 1 file changed, 112 insertions(+), 5 deletions(-) diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 73485a2323c..a21d9571dff 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -41,6 +41,7 @@ #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procsignal.h" +#include "storage/read_stream.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -422,6 +423,67 @@ apw_load_buffers(void) apw_state->prewarmed_blocks, num_elements))); } +struct apw_read_stream_private +{ + bool first_block; + int max_pos; + int pos; + BlockInfoRecord *block_info; + BlockNumber nblocks_in_fork; + +}; + +static BlockNumber +apw_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + struct apw_read_stream_private *p = callback_private_data; + BlockInfoRecord *old_blk; + BlockInfoRecord *cur_blk; + + /* + * There may still be queued blocks in the stream even when no free + * buffers are available in the buffer pool. This can lead to unnecessary + * I/O operations and buffer evictions. One possible solution is to + * compare the number of free buffers in the buffer pool with the number + * of queued blocks in the stream. However, this approach is considered a + * workaround and would add complexity with minimal benefit, as only a few + * unnecessary I/O operations and buffer evictions are expected. + * Therefore, this solution has not been implemented. + */ + if (!have_free_buffer()) + return InvalidBlockNumber; + + if (p->pos == p->max_pos) + return InvalidBlockNumber; + + cur_blk = &(p->block_info[p->pos]); + + if (cur_blk->blocknum >= p->nblocks_in_fork) + return InvalidBlockNumber; + + if (p->first_block) + { + p->first_block = false; + p->pos++; + return cur_blk->blocknum; + } + + Assert(p->pos > 0 && p->pos < p->max_pos); + old_blk = &(p->block_info[p->pos - 1]); + + if (old_blk->database == cur_blk->database && + old_blk->forknum == cur_blk->forknum && + old_blk->filenumber == cur_blk->filenumber) + { + p->pos++; + return cur_blk->blocknum; + } + + return InvalidBlockNumber; +} + /* * Prewarm all blocks for one database (and possibly also global objects, if * those got grouped with this database). @@ -435,6 +497,8 @@ autoprewarm_database_main(Datum main_arg) BlockNumber nblocks = 0; BlockInfoRecord *old_blk = NULL; dsm_segment *seg; + ReadStream *stream = NULL; + struct apw_read_stream_private p; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, die); @@ -451,13 +515,16 @@ autoprewarm_database_main(Datum main_arg) block_info = (BlockInfoRecord *) dsm_segment_address(seg); pos = apw_state->prewarm_start_idx; + p.block_info = block_info; + p.max_pos = apw_state->prewarm_stop_idx; + /* * Loop until we run out of blocks to prewarm or until we run out of free * buffers. */ - while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) + for (; pos < apw_state->prewarm_stop_idx && have_free_buffer(); pos++) { - BlockInfoRecord *blk = &block_info[pos++]; + BlockInfoRecord *blk = &block_info[pos]; Buffer buf; CHECK_FOR_INTERRUPTS(); @@ -470,6 +537,18 @@ autoprewarm_database_main(Datum main_arg) old_blk->database != 0) break; + /* + * If stream needs to be created again, end it before closing the old + * relation. + */ + if (stream && (old_blk == NULL || + old_blk->filenumber != blk->filenumber || + old_blk->forknum != blk->forknum)) + { + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + } + /* * As soon as we encounter a block of a new relation, close the old * relation. Note that rel will be NULL if try_relation_open failed @@ -506,7 +585,10 @@ autoprewarm_database_main(Datum main_arg) continue; } - /* Once per fork, check for fork existence and size. */ + /* + * Once per fork, check for fork existence and size. Then create read + * stream if it is suitable. + */ if (old_blk == NULL || old_blk->filenumber != blk->filenumber || old_blk->forknum != blk->forknum) @@ -518,7 +600,27 @@ autoprewarm_database_main(Datum main_arg) if (blk->forknum > InvalidForkNumber && blk->forknum <= MAX_FORKNUM && smgrexists(RelationGetSmgr(rel), blk->forknum)) + { nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); + + /* Create read stream. */ + p.nblocks_in_fork = nblocks; + p.pos = pos; + + /* + * There is a special case for the first block in the + * relation. We can't compare it with the previous block as + * there is no previous block yet. + */ + p.first_block = true; + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + blk->forknum, + apw_read_stream_next_block, + &p, + 0); + } else nblocks = 0; } @@ -532,8 +634,7 @@ autoprewarm_database_main(Datum main_arg) } /* Prewarm buffer. */ - buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, - NULL); + buf = read_stream_next_buffer(stream, NULL); if (BufferIsValid(buf)) { apw_state->prewarmed_blocks++; @@ -543,6 +644,12 @@ autoprewarm_database_main(Datum main_arg) old_blk = blk; } + if (stream) + { + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + } + dsm_detach(seg); /* Release lock on previous relation. */ -- 2.43.0