From dbcc430c4b92c2a69f84fe9ab3faa94f61eb3d99 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 2 Sep 2025 12:43:24 -0400 Subject: [PATCH v4 3/9] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger run, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level. The heuristic for eager eviction is to only flush buffers in the strategy ring which do not require a WAL flush. This patch also is a step toward AIO writes. Reviewed-by: Chao Li Reviewed-by: Nazir Bilal Yavuz Earlier version Reviewed-by: Kirill Reshke Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 189 +++++++++++++++++++++++++- src/backend/storage/buffer/freelist.c | 48 +++++++ src/include/storage/buf_internals.h | 4 + 3 files changed, 235 insertions(+), 6 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f40f57e5582..c64268f31ae 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -534,7 +534,16 @@ static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object IOContext io_context, XLogRecPtr buffer_lsn); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); -static void CleanVictimBuffer(BufferDesc *bufdesc, uint32 *buf_state, +static BufferDesc *NextStratBufToFlush(BufferAccessStrategy strategy, + Buffer sweep_end, + XLogRecPtr *lsn, + int *sweep_cursor); +static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, + bool skip_pinned, + XLogRecPtr *max_lsn); +static void CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, uint32 *buf_state, bool from_ring, IOContext io_context); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, @@ -2420,7 +2429,7 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) } /* Content lock is released inside CleanVictimBuffer */ - CleanVictimBuffer(buf_hdr, &buf_state, from_ring, io_context); + CleanVictimBuffer(strategy, buf_hdr, &buf_state, from_ring, io_context); } if (buf_state & BM_VALID) @@ -4254,6 +4263,40 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Returns the buffer descriptor of the buffer containing the next block we + * should eagerly flush or NULL when there are no further buffers to consider + * writing out. + */ +static BufferDesc * +NextStratBufToFlush(BufferAccessStrategy strategy, + Buffer sweep_end, + XLogRecPtr *lsn, int *sweep_cursor) +{ + Buffer bufnum; + BufferDesc *bufdesc; + + while ((bufnum = + StrategySweepNextBuffer(strategy, sweep_cursor)) != sweep_end) + { + /* + * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining + * buffers in the ring will be invalid. + */ + if (!BufferIsValid(bufnum)) + break; + + if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum, + InvalidBlockNumber, + NULL, + true, + lsn)) != NULL) + return bufdesc; + } + + return NULL; +} + /* * Prepare and write out a dirty victim buffer. * @@ -4264,12 +4307,14 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * bufdesc and buf_state may be modified. */ static void -CleanVictimBuffer(BufferDesc *bufdesc, uint32 *buf_state, +CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, uint32 *buf_state, bool from_ring, IOContext io_context) { XLogRecPtr max_lsn = InvalidXLogRecPtr; LWLock *content_lock; + bool first_buffer = true; Assert(*buf_state & BM_DIRTY); @@ -4277,11 +4322,143 @@ CleanVictimBuffer(BufferDesc *bufdesc, uint32 *buf_state, if (!PrepareFlushBuffer(bufdesc, buf_state, &max_lsn)) return; - DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + if (from_ring && StrategySupportsEagerFlush(strategy)) + { + Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc); + int cursor = StrategySweepStart(strategy); + + /* Clean victim buffer and find more to flush opportunistically */ + do + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + /* We leave the first buffer pinned for the caller */ + if (!first_buffer) + UnpinBuffer(bufdesc); + first_buffer = false; + } while ((bufdesc = NextStratBufToFlush(strategy, sweep_end, + &max_lsn, &cursor)) != NULL); + } + else + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + } +} + +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, return the block -- the pointer to the block data in memory + * -- which we will opportunistically flush or NULL if this buffer does not + * contain a block that should be flushed. + * + * require is the BlockNumber required by the caller. Some callers may require + * a specific BlockNumber to be in bufnum because they are assembling a + * contiguous run of blocks. + * + * If the caller needs the block to be from a specific relation, rlocator will + * be provided. + */ +BufferDesc * +PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 buf_state; + XLogRecPtr lsn; + BlockNumber blknum; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + return NULL; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + + /* Block may need to be in a specific relation */ + if (rlocator && + !RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag), + *rlocator)) + return NULL; + + /* Must do this before taking the buffer header spinlock */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + buf_state = LockBufHdr(bufdesc); + + if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID)) + goto except_unlock_header; + + /* We don't eagerly flush buffers used by others */ + if (skip_pinned && + (BUF_STATE_GET_REFCOUNT(buf_state) > 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) > 1)) + goto except_unlock_header; + + /* Get page LSN while holding header lock */ + lsn = BufferGetLSN(bufdesc); + + PinBuffer_Locked(bufdesc); + CheckBufferIsPinnedOnce(bufnum); + + blknum = BufferGetBlockNumber(bufnum); + Assert(BlockNumberIsValid(blknum)); + + /* If we'll have to flush WAL to flush the block, we're done */ + if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn)) + goto except_unpin_buffer; + + /* We only include contiguous blocks in the run */ + if (BlockNumberIsValid(require) && blknum != require) + goto except_unpin_buffer; + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto except_unpin_buffer; + + /* + * Now that we have the content lock, we need to recheck if we need to + * flush WAL. + */ + buf_state = LockBufHdr(bufdesc); + lsn = BufferGetLSN(bufdesc); + UnlockBufHdr(bufdesc, buf_state); + + if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn)) + goto except_unlock_content; + + /* Try to start an I/O operation */ + if (!StartBufferIO(bufdesc, false, true)) + goto except_unlock_content; + + if (lsn > *max_lsn) + *max_lsn = lsn; + buf_state = LockBufHdr(bufdesc); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufdesc, buf_state); + + return bufdesc; + +except_unlock_content: LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &bufdesc->tag); + +except_unpin_buffer: + UnpinBuffer(bufdesc); + return NULL; + +except_unlock_header: + UnlockBufHdr(bufdesc, buf_state); + return NULL; } /* diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 12bb7e2312e..8716109221b 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -156,6 +156,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lead to better I/O + * patterns than lazily flushing buffers immediately before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -270,6 +295,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Return the next buffer in the ring or InvalidBuffer if the current sweep is + * over. + */ +Buffer +StrategySweepNextBuffer(BufferAccessStrategy strategy, int *sweep_cursor) +{ + if (++(*sweep_cursor) >= strategy->nbuffers) + *sweep_cursor = 0; + + return strategy->buffers[*sweep_cursor]; +} + +/* + * Return the starting buffer of a sweep of the strategy ring + */ +int +StrategySweepStart(BufferAccessStrategy strategy) +{ + return strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index b1b81f31419..03faf80e441 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -437,6 +437,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); +extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy, + int *sweep_cursor); +extern int StrategySweepStart(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); -- 2.43.0