From bc956edc684384ed1815dbfeb72bca43268fd88b Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 7 Jan 2026 14:56:49 -0500 Subject: [PATCH v12 3/8] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger run, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level. The heuristic for eager eviction is to only flush buffers in the strategy ring which do not require a WAL flush. This patch also is a step toward AIO writes, as it lines up multiple buffers that can be issued asynchronously once the infrastructure exists. Author: Melanie Plageman Reviewed-by: Chao Li Reviewed-by: Nazir Bilal Yavuz Earlier version Reviewed-by: Kirill Reshke Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 160 ++++++++++++++++++++++++++ src/backend/storage/buffer/freelist.c | 48 ++++++++ src/include/storage/buf_internals.h | 4 + 3 files changed, 212 insertions(+) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 2c706682eb3..d31b6243354 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -631,6 +631,10 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + +static BufferDesc *PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, + Buffer bufnum, + XLogRecPtr *max_lsn); static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn); static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context, @@ -2503,6 +2507,59 @@ again: /* May be nothing to do if buffer was cleaned */ LWLockRelease(BufferDescriptorGetContentLock(buf_hdr)); } + else if (from_ring && StrategySupportsEagerFlush(strategy)) + { + Buffer sweep_end = buf; + int cursor = StrategyGetCurrentIndex(strategy); + bool first_buffer = true; + BufferDesc *next_bufdesc = buf_hdr; + + /* + * Flush the victim buffer and then loop around strategy ring one + * time eagerly flushing all of the eligible buffers. + */ + for (;;) + { + Buffer next_buf; + + if (next_bufdesc) + { + DoFlushBuffer(next_bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + LWLockRelease(BufferDescriptorGetContentLock(next_bufdesc)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &next_bufdesc->tag); + /* We leave the first buffer pinned for the caller */ + if (!first_buffer) + UnpinBuffer(next_bufdesc); + first_buffer = false; + } + + next_buf = StrategyNextBuffer(strategy, &cursor); + + /* Completed one sweep of the ring buffer */ + if (next_buf == sweep_end) + break; + + /* + * For strategies currently supporting eager flush + * (BAS_BULKWRITE, eventually BAS_VACUUM), once you hit an + * InvalidBuffer, the remaining buffers in the ring will be + * invalid. If BAS_BULKREAD is someday supported, this logic + * will have to change. + */ + if (!BufferIsValid(next_buf)) + break; + + /* + * Check buffer eager flush eligibility. If the buffer is + * ineligible, we'll keep looking until we complete one full + * sweep around the ring. + */ + next_bufdesc = PrepareOrRejectEagerFlushBuffer(strategy, + next_buf, + &max_lsn); + } + } else { DoFlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context, max_lsn); @@ -4430,6 +4487,109 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, return the buffer descriptor of the buffer to eagerly flush, + * pinned and locked and with BM_IO_IN_PROGRESS set, or NULL if this buffer + * does not contain a block that should be flushed. + * + * max_lsn may be updated if the provided buffer LSN exceeds the current max + * LSN. + */ +static BufferDesc * +PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, + Buffer bufnum, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 buf_state; + XLogRecPtr lsn; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + goto reject_buffer; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + buf_state = pg_atomic_read_u32(&bufdesc->state); + + /* + * Quick racy check to see if the buffer is clean, in which case we don't + * need to flush it. We'll recheck if it is dirty again later before + * actually setting BM_IO_IN_PROGRESS. + */ + if (!(buf_state & BM_DIRTY)) + goto reject_buffer; + + /* + * Quick check to see if the buffer is pinned, in which case it is more + * likely to be dirtied again soon, and we don't want to eagerly flush it. + * We don't care if it has a non-zero usage count because we don't need to + * reuse it right away and a non-zero usage count doesn't necessarily mean + * it will be dirtied again soon. + */ + if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) + goto reject_buffer; + + /* + * Don't eagerly flush buffers requiring WAL flush. We must check this + * again later while holding the buffer content lock for correctness. + */ + if (BufferNeedsWALFlush(bufdesc, false, &lsn)) + goto reject_buffer; + + /* + * Ensure that there's a free refcount entry and resource owner slot for + * the pin before pinning the buffer. While this may leak a refcount and + * slot if we return without a buffer, that slot will be reused. + */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + /* There is no need to flush the buffer if it is not BM_VALID */ + if (!PinBuffer(bufdesc, strategy, /* skip_if_not_valid */ true)) + goto reject_buffer; + + CheckBufferIsPinnedOnce(bufnum); + + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto reject_buffer_unpin; + + if (BufferNeedsWALFlush(bufdesc, false, &lsn)) + goto reject_buffer_unlock; + + /* Try to start an I/O operation */ + if (!StartBufferIO(bufdesc, false, true)) + goto reject_buffer_unlock; + + /* + * Because we don't eagerly flush buffers that need WAL flushed first, + * this buffer's LSN should only be greater than the victim buffer LSN if + * the victim doesn't need WAL flushing either -- in which case, we don't + * really need to update max_lsn. But, it seems better to keep the max_lsn + * honest -- especially since doing so is cheap. + */ + if (lsn > *max_lsn) + *max_lsn = lsn; + + buf_state = LockBufHdr(bufdesc); + UnlockBufHdrExt(bufdesc, buf_state, 0, BM_JUST_DIRTIED, 0); + + return bufdesc; + +reject_buffer_unlock: + LWLockRelease(content_lock); + +reject_buffer_unpin: + UnpinBuffer(bufdesc); + +reject_buffer: + return NULL; +} + /* * Prepare the buffer with bufdesc for writing. Returns true if the buffer * actually needs writing and false otherwise. lsn returns the buffer's LSN if diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 6a4452e2da0..dfa6b27a4af 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -155,6 +155,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lead to better I/O + * patterns than lazily flushing buffers immediately before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -306,6 +331,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Returns the next buffer in the ring after the one at cursor and increments + * cursor. + */ +Buffer +StrategyNextBuffer(BufferAccessStrategy strategy, int *cursor) +{ + if (++(*cursor) >= strategy->nbuffers) + *cursor = 0; + + return strategy->buffers[*cursor]; +} + +/* + * Return the current slot in the strategy ring. + */ +int +StrategyGetCurrentIndex(BufferAccessStrategy strategy) +{ + return strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 3c774d7a1d2..09521af4bdc 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -503,6 +503,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); +extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy, + int *cursor); +extern int StrategyGetCurrentIndex(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); -- 2.43.0