From 5b6b2ebc60100d6d062bd837aa30f5943d4212cc Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 31 Jan 2024 07:27:11 +0000 Subject: [PATCH v22 2/4] Allow WALReadFromBuffers() to wait for in-progress insertions --- src/backend/access/transam/xlog.c | 43 ++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0d87a66c59..d82557886e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -698,7 +698,7 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); -static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); +static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog); static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); @@ -1494,7 +1494,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt) * to make room for a new one, which in turn requires WALWriteLock. */ static XLogRecPtr -WaitXLogInsertionsToFinish(XLogRecPtr upto) +WaitXLogInsertionsToFinish(XLogRecPtr upto, bool emitLog) { uint64 bytepos; XLogRecPtr reservedUpto; @@ -1521,9 +1521,10 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) */ if (upto > reservedUpto) { - ereport(LOG, - (errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X", - LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto)))); + if (emitLog) + ereport(LOG, + (errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto)))); upto = reservedUpto; } @@ -1712,7 +1713,11 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) * starting at location 'startptr' and returns total bytes read. * * The bytes read may be fewer than requested if any of the WAL buffers in the - * requested range have been evicted. + * requested range have been evicted, or if the last requested byte is beyond + * the current insert position. + * + * If reading beyond the current write position, this function will wait for + * concurrent inserters to finish. Otherwise, it does not wait at all. * * This function returns immediately if the requested data is not from the * current timeline, or if the server is in recovery. @@ -1724,6 +1729,7 @@ WALReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) Size nbytes = count; Size ntotal = 0; char *dst = buf; + XLogRecPtr upto = startptr + count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) @@ -1731,6 +1737,23 @@ WALReadFromBuffers(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli) Assert(!XLogRecPtrIsInvalid(startptr)); + /* + * Caller requested very recent WAL data. Wait for any in-progress WAL + * insertions to WAL buffers to finish. + * + * Most callers will have already updated LogwrtResult when determining + * how far to read, but it's OK if it's out of date. XXX: is it worth + * taking a spinlock to update LogwrtResult and check again before calling + * WaitXLogInsertionsToFinish()? + */ + if (upto > LogwrtResult.Write) + { + XLogRecPtr writtenUpto = WaitXLogInsertionsToFinish(upto, false); + + upto = Min(upto, writtenUpto); + nbytes = upto - startptr; + } + /* * Loop through the buffers without a lock. For each buffer, atomically * read and verify the end pointer, then copy the data out, and finally @@ -2001,7 +2024,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) */ LWLockRelease(WALBufMappingLock); - WaitXLogInsertionsToFinish(OldPageRqstPtr); + WaitXLogInsertionsToFinish(OldPageRqstPtr, true); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); @@ -2795,7 +2818,7 @@ XLogFlush(XLogRecPtr record) * Before actually performing the write, wait for all in-flight * insertions to the pages we're about to write to finish. */ - insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr); + insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr, true); /* * Try to get the write lock. If we can't get it immediately, wait @@ -2846,7 +2869,7 @@ XLogFlush(XLogRecPtr record) * We're only calling it again to allow insertpos to be moved * further forward, not to actually wait for anyone. */ - insertpos = WaitXLogInsertionsToFinish(insertpos); + insertpos = WaitXLogInsertionsToFinish(insertpos, true); } /* try to write/flush later additions to XLOG as well */ @@ -3025,7 +3048,7 @@ XLogBackgroundFlush(void) START_CRIT_SECTION(); /* now wait for any in-progress insertions to finish and get write lock */ - WaitXLogInsertionsToFinish(WriteRqst.Write); + WaitXLogInsertionsToFinish(WriteRqst.Write, true); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->LogwrtResult; if (WriteRqst.Write > LogwrtResult.Write || -- 2.34.1