From 9b490b29103987bb236db351bcbd4e2d4ce23514 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 4 Mar 2024 14:54:51 +0000 Subject: [PATCH v13 3/3] Add Copy pointer to track data copied to WAL buffers --- src/backend/access/transam/xlog.c | 34 ++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ffbf93690e..2568d39c11 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -321,6 +321,7 @@ static bool doPageWrites; typedef struct XLogwrtAtomic { + pg_atomic_uint64 Copy; /* last byte + 1 copied to WAL buffers */ pg_atomic_uint64 Write; /* last byte + 1 written out */ pg_atomic_uint64 Flush; /* last byte + 1 flushed */ } XLogwrtAtomic; @@ -1497,6 +1498,7 @@ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; + XLogRecPtr copyptr; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1505,6 +1507,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (MyProc == NULL) elog(PANIC, "cannot wait without a PGPROC structure"); + /* check if there's any work to do */ + copyptr = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Copy); + if (upto <= copyptr) + return copyptr; + /* Read the current insert position */ SpinLockAcquire(&Insert->insertpos_lck); bytepos = Insert->CurrBytePos; @@ -1584,6 +1591,19 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) finishedUpto = insertingat; } + + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Copy, finishedUpto); + +#ifdef USE_ASSERT_CHECKING + { + XLogRecPtr Copy = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Copy); + XLogRecPtr Write = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Write); + XLogRecPtr Flush = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Flush); + + Assert(Copy >= Write && Write >= Flush); + } +#endif + return finishedUpto; } @@ -1725,13 +1745,23 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; + XLogRecPtr copyptr; Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + + /* + * Caller should ensure that the requested data has been copied to WAL + * buffers before we try to read it. + */ + copyptr = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Copy); + if (startptr + count > copyptr) + ereport(ERROR, + (errmsg("request to read past end of generated WAL; request %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(startptr + count), LSN_FORMAT_ARGS(copyptr)))); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -4901,6 +4931,7 @@ XLOGShmemInit(void) XLogCtl->InstallXLogFileSegmentActive = false; XLogCtl->WalWriterSleeping = false; + pg_atomic_init_u64(&XLogCtl->LogwrtResult.Copy, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->LogwrtResult.Write, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->LogwrtResult.Flush, InvalidXLogRecPtr); @@ -5927,6 +5958,7 @@ StartupXLOG(void) * because no other process can be reading or writing WAL yet. */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Copy, EndOfLog); pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog); pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; -- 2.34.1