From 735bb47365c181c78586092fada1756c5d13dd93 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 15 Oct 2025 15:23:16 -0400 Subject: [PATCH v12 6/8] Implement checkpointer data write combining When the checkpointer writes out dirty buffers, writing multiple contiguous blocks as a single IO is a substantial performance improvement. The checkpointer is usually bottlenecked on IO, so issuing larger IOs leads to increased write throughput and faster checkpoints. Author: Melanie Plageman Reviewed-by: Chao Li Reviewed-by: Soumya Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com --- src/backend/storage/buffer/bufmgr.c | 225 ++++++++++++++++++++++++---- src/backend/utils/probes.d | 2 +- 2 files changed, 199 insertions(+), 28 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e1f3d45b522..83cc4018d28 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3604,7 +3604,6 @@ BufferNeedsWALFlush(BufferDesc *bufdesc, bool exclusive, XLogRecPtr *lsn) static void BufferSync(int flags) { - uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -3616,6 +3615,8 @@ BufferSync(int flags) int i; uint32 mask = BM_DIRTY; WritebackContext wb_context; + uint32 max_batch_size; + BufferWriteBatch batch; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3647,6 +3648,7 @@ BufferSync(int flags) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint32 set_bits = 0; + uint32 buf_state; /* * Header spinlock is enough to examine BM_DIRTY, see comment in @@ -3789,48 +3791,217 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + max_batch_size = MaxWriteBatchSize(NULL); while (!binaryheap_empty(ts_heap)) { + BlockNumber limit = max_batch_size; BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan; + int processed = 0; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); + batch.start = InvalidBlockNumber; + batch.max_lsn = InvalidXLogRecPtr; + batch.n = 0; - bufHdr = GetBufferDescriptor(buf_id); + while (batch.n < limit) + { + uint32 buf_state; + XLogRecPtr lsn = InvalidXLogRecPtr; + LWLock *content_lock; + CkptSortItem item; + Buffer buffer; - num_processed++; + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); - /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. - */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) - { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + /* Check if we are done with this tablespace */ + if (ts_stat->index + processed >= ts_end) + break; + + item = CkptBufferIds[ts_stat->index + processed]; + + buf_id = item.buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + buffer = BufferDescriptorGetBuffer(bufHdr); + + /* + * If this is the first block of the batch, then check if we need + * to open a new relation. Open the relation now because we have + * to determine the maximum IO size based on how many blocks + * remain in the file. + */ + if (!BlockNumberIsValid(batch.start)) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0); + batch.rlocator.spcOid = item.tsId; + batch.rlocator.dbOid = item.dbId; + batch.rlocator.relNumber = item.relNumber; + batch.forkno = item.forkNum; + batch.start = item.blockNum; + batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER); + limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + /* Guarantee progress */ + limit = Max(limit, 1); } + + /* + * Once we hit blocks from the next relation or fork of the + * relation, break out of the loop and issue the IO we've built up + * so far. It is important that we don't increment processed + * because we want to start the next IO with this item. + */ + if (item.dbId != batch.rlocator.dbOid || + item.relNumber != batch.rlocator.relNumber || + item.forkNum != batch.forkno) + break; + + Assert(item.tsId == batch.rlocator.spcOid); + + /* + * If the next block is not contiguous, we can't include it in the + * IO we will issue. Break out of the loop and issue what we have + * so far. Do not count this item as processed -- otherwise we + * will end up skipping it. + */ + if (item.blockNum != batch.start + batch.n) + break; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a few bits. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since StartBufferIO will then return false. + * + * If the buffer doesn't need checkpointing, don't include it in + * the batch we are building. And if the buffer doesn't need + * flushing, we're done with the item, so count it as processed + * and break out of the loop to issue the IO so far. + */ + buf_state = pg_atomic_read_u32(&bufHdr->state); + if ((buf_state & (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) != + (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) + { + processed++; + break; + } + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + /* If the buffer is not BM_VALID, nothing to do on this buffer */ + if (!PinBuffer(bufHdr, NULL, true)) + { + processed++; + break; + } + + /* + * Now that we have a pin, we must recheck that the buffer + * contains the specified block. Someone may have replaced the + * block in the buffer with a different block. In that case, count + * it as processed and issue the IO so far. + */ + if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag), + batch.rlocator) || + BufTagGetForkNum(&bufHdr->tag) != batch.forkno || + item.blockNum != BufferGetBlockNumber(buffer)) + { + UnpinBuffer(bufHdr); + processed++; + break; + } + + /* + * It's conceivable that between the time we examine the buffer + * header for BM_CHECKPOINT_NEEDED above and when we are now + * acquiring the lock that someone else wrote the buffer out. In + * that improbable case, we will write the buffer though we didn't + * need to. It doesn't seem worth guarding against this, though. + */ + content_lock = BufferDescriptorGetContentLock(bufHdr); + + /* + * We are willing to wait for the content lock on the first IO in + * the batch. However, for subsequent IOs, waiting could lead to + * deadlock. We have to eventually flush all eligible buffers, + * though. So, if we fail to acquire the lock on a subsequent + * buffer, we break out and issue the IO we've built up so far. + * Then we come back and start a new IO with that buffer as the + * starting buffer. As such, we must not count the item as + * processed if we end up failing to acquire the content lock. + */ + if (batch.n == 0) + LWLockAcquire(content_lock, LW_SHARED); + else if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + UnpinBuffer(bufHdr); + break; + } + + /* + * If the buffer doesn't need IO, count the item as processed, + * release the buffer, and break out of the loop to issue the IO + * we have built up so far. + */ + if (!StartBufferIO(bufHdr, false, true)) + { + LWLockRelease(content_lock); + UnpinBuffer(bufHdr); + processed++; + break; + } + + /* + * Lock buffer header lock before examining LSN because we only + * have a shared lock on the buffer. + */ + buf_state = LockBufHdr(bufHdr); + lsn = BufferGetLSN(bufHdr); + UnlockBufHdrExt(bufHdr, buf_state, 0, BM_JUST_DIRTIED, 0); + + /* + * Keep track of the max LSN so that we can be sure to flush + * enough WAL before flushing data from the buffers. See comment + * in DoFlushBuffer() for more on why we don't consider the LSNs + * of unlogged relations. + */ + if (buf_state & BM_PERMANENT && lsn > batch.max_lsn) + batch.max_lsn = lsn; + + batch.bufdescs[batch.n++] = bufHdr; + processed++; } /* * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. + * - otherwise writing becomes unbalanced. */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + num_processed += processed; + ts_stat->progress += ts_stat->progress_slice * processed; + ts_stat->num_scanned += processed; + ts_stat->index += processed; + + /* + * If we built up an IO, issue it. There's a chance we didn't find any + * items referencing buffers that needed flushing this time, but we + * still want to check if we should update the heap if we examined and + * processed the items. + */ + if (batch.n > 0) + { + FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context); + + TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n); + PendingCheckpointerStats.buffers_written += batch.n; + num_written += batch.n; + } /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index e0f48c6d2d9..90169c92c26 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -68,7 +68,7 @@ provider postgresql { probe buffer__checkpoint__sync__start(); probe buffer__checkpoint__done(); probe buffer__sync__start(int, int); - probe buffer__sync__written(int); + probe buffer__batch__sync__written(BlockNumber); probe buffer__sync__done(int, int, int); probe deadlock__found(); -- 2.43.0