From f533b4bb5e2de11db6c1bdbff03c6232ec7321ac Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 12 Jan 2026 11:49:41 -0500 Subject: [PATCH v12 8/8] Eagerly flush buffer successors When flushing a dirty buffer, check if it the two blocks following it are in shared buffers and whether or not they are dirty. If they are, flush them together with the victim buffer. --- src/backend/storage/buffer/bufmgr.c | 145 +++++++++++++++++++++++----- 1 file changed, 122 insertions(+), 23 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 4ae70915089..4dc13c7d972 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -649,7 +649,10 @@ static bool BufferHasRequiredBlock(BatchBlockRequirements *require, BufferDesc * static BufferDesc *PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Buffer bufnum, BatchBlockRequirements *require, + LWLock *buftable_lock, XLogRecPtr *max_lsn); +static void FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *batch_start, + BufferWriteBatch *batch); static void FindStrategyFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end, BufferDesc *batch_start, uint32 max_batch_size, @@ -2621,15 +2624,22 @@ again: next_bufdesc = PrepareOrRejectEagerFlushBuffer(strategy, next_buf, NULL, + NULL, &max_lsn); } } else { - DoFlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context, max_lsn); - LWLockRelease(BufferDescriptorGetContentLock(buf_hdr)); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); + BufferWriteBatch batch; + + /* Pin victim again so it stays ours even after batch released */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + IncrBufferRefCount(BufferDescriptorGetBuffer(buf_hdr)); + + FindFlushAdjacents(strategy, buf_hdr, &batch); + FlushBufferBatch(&batch, io_context); + CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext); } } @@ -4740,6 +4750,35 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } + +static BlockNumber +WriteBatchInit(BufferDesc *batch_start, uint32 max_batch_size, + BufferWriteBatch *batch) +{ + BlockNumber limit; + + Assert(batch_start); + batch->bufdescs[0] = batch_start; + + LockBufHdr(batch_start); + batch->max_lsn = BufferGetLSN(batch_start); + UnlockBufHdr(batch_start); + + batch->start = batch->bufdescs[0]->tag.blockNum; + Assert(BlockNumberIsValid(batch->start)); + batch->n = 1; + batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); + batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + + return limit; +} + + /* * Quick check to see if the buffer contains the required block from the right * fork of the right relation. If you don't hold the buffer header spinlock, @@ -4774,6 +4813,9 @@ BufferHasRequiredBlock(BatchBlockRequirements *require, BufferDesc *bufdesc) * accept it, they will provide the required block number and its * RelFileLocator and fork. * + * If the caller is holding the buftable_lock, it will be released after + * acquiring a pin on the buffer. + * * max_lsn may be updated if the provided buffer LSN exceeds the current max * LSN. */ @@ -4781,6 +4823,7 @@ static BufferDesc * PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Buffer bufnum, BatchBlockRequirements *require, + LWLock *buftable_lock, XLogRecPtr *max_lsn) { BufferDesc *bufdesc; @@ -4841,6 +4884,12 @@ PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, if (!PinBuffer(bufdesc, strategy, /* skip_if_not_valid */ true)) goto reject_buffer; + if (buftable_lock) + { + LWLockRelease(buftable_lock); + buftable_lock = NULL; + } + CheckBufferIsPinnedOnce(bufnum); /* Now that we have the buffer pinned, recheck it's got the right block */ @@ -4881,6 +4930,8 @@ reject_buffer_unpin: UnpinBuffer(bufdesc); reject_buffer: + if (buftable_lock) + LWLockRelease(buftable_lock); return NULL; } @@ -4909,26 +4960,8 @@ FindStrategyFlushAdjacents(BufferAccessStrategy strategy, BufferWriteBatch *batch, int *sweep_cursor) { - BlockNumber limit; BatchBlockRequirements require; - - Assert(batch_start); - batch->bufdescs[0] = batch_start; - - LockBufHdr(batch_start); - batch->max_lsn = BufferGetLSN(batch_start); - UnlockBufHdr(batch_start); - - batch->start = batch->bufdescs[0]->tag.blockNum; - Assert(BlockNumberIsValid(batch->start)); - batch->n = 1; - batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); - batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); - batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); - - limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); - limit = Min(max_batch_size, limit); - limit = Min(GetAdditionalPinLimit(), limit); + BlockNumber limit = WriteBatchInit(batch_start, max_batch_size, batch); /* * It's possible we're not allowed any more pins or there aren't more @@ -4963,6 +4996,72 @@ FindStrategyFlushAdjacents(BufferAccessStrategy strategy, if ((batch->bufdescs[batch->n] = PrepareOrRejectEagerFlushBuffer(strategy, bufnum, &require, + NULL, + &batch->max_lsn)) == NULL) + break; + } +} + + +/* + * Check if the blocks after my block are in shared buffers and dirty and if + * it is, write them out too + */ +static void +FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *batch_start, + BufferWriteBatch *batch) +{ + BufferTag newTag; /* identity of requested block */ + uint32 newHash; /* hash value for newTag */ + LWLock *newPartitionLock; /* buffer partition lock for it */ + int buf_id; + BlockNumber limit; + BlockNumber max_batch_size = 3; /* we only look for two successors */ + BatchBlockRequirements require; + + limit = WriteBatchInit(batch_start, max_batch_size, batch); + + /* + * It's possible we're not allowed any more pins or there aren't more + * blocks in the target relation. In this case, just return. Our batch + * will have only one buffer. + */ + if (limit <= 1) + return; + + require.rlocator = &batch->rlocator; + require.forkno = batch->forkno; + + for (; batch->n < limit; batch->n++) + { + require.block = batch->start + batch->n; + + Assert(BlockNumberIsValid(require.block)); + + /* create a tag so we can lookup the buffer */ + InitBufferTag(&newTag, &batch->rlocator, batch->forkno, + require.block); + + /* determine its hash code and partition lock ID */ + newHash = BufTableHashCode(&newTag); + newPartitionLock = BufMappingPartitionLock(newHash); + + /* see if the block is in the buffer pool already */ + LWLockAcquire(newPartitionLock, LW_SHARED); + buf_id = BufTableLookup(&newTag, newHash); + + /* The block may not even be in shared buffers. */ + if (buf_id < 0) + { + LWLockRelease(newPartitionLock); + break; + } + + if ((batch->bufdescs[batch->n] = + PrepareOrRejectEagerFlushBuffer(strategy, + buf_id + 1, + &require, + newPartitionLock, &batch->max_lsn)) == NULL) break; } -- 2.43.0