From 812bc196c977fb3610d3ea8320988d6bd4b00f29 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 27 Feb 2025 21:42:05 +1300 Subject: [PATCH v2 2/4] Respect pin limits accurately in read_stream.c. To avoid pinning too much of the buffer pool at once, we previously used LimitAdditionalBuffers(). The coding was naive, and only considered the available buffers at stream construction time. This commit checks at the time of use with new buffer manager APIs. The result might change dynamically due to pins acquired outside this stream by the same backend. No extra CPU cycles are added to the all-buffered fast-path code, but the I/O-starting path now considers the up-to-date remaining buffer limit when making look-ahead decisions. In practice it was very difficult to exceed the limit in v17, so no back-patch, but changes due to land soon make it easy. Per code review from Andres, in the course of testing his AIO patches. Reported-by: Andres Freund Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++---- 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 04bdb5e6d4b..6c2b4ec011d 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -115,6 +115,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; bool advice_enabled; + bool temporary; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -224,7 +225,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } -static void +/* + * Start as much of the current pending read as we can. If we have to split it + * because of the per-backend buffer limit, or the buffer manager decides to + * split it, then the pending read is adjusted to hold the remaining portion. + * + * We can always start a read of at least size one if we have no progress yet. + * Otherwise it's possible that we can't start a read at all because of a lack + * of buffers, and then false is returned. Buffer shortages also reduce the + * distance to a level that prevents look-ahead until buffers are released. + */ +static bool read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { bool need_wait; @@ -233,12 +244,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) int16 io_index; int16 overflow; int16 buffer_index; + int16 buffer_limit; /* This should only be called with a pending read. */ Assert(stream->pending_read_nblocks > 0); Assert(stream->pending_read_nblocks <= io_combine_limit); - /* We had better not exceed the pin limit by starting this read. */ + /* We had better not exceed the per-stream buffer limit with this read. */ Assert(stream->pinned_buffers + stream->pending_read_nblocks <= stream->max_pinned_buffers); @@ -259,10 +271,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else flags = 0; - /* We say how many blocks we want to read, but may be smaller on return. */ + /* Compute the remaining portion of the per-backend buffer limit. */ + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); + else + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; /* guarantee progress */ + + /* Does the per-backend buffer limit affect this read? */ + nblocks = stream->pending_read_nblocks; + if (buffer_limit < nblocks) + { + int16 new_distance; + + /* Shrink distance: no more look-ahead until buffers are released. */ + new_distance = stream->pinned_buffers + buffer_limit; + if (stream->distance > new_distance) + stream->distance = new_distance; + + /* If we've already made progress, just give up and wait for buffers. */ + if (stream->pinned_buffers > 0) + return false; + + /* A short read is required to make progress. */ + nblocks = buffer_limit; + } + + /* + * We say how many blocks we want to read, but it may be smaller on return + * if the buffer manager decides it needs a short read at its level. + */ buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = stream->pending_read_nblocks; need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -312,19 +353,27 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Adjust the pending read to cover the remaining portion, if any. */ stream->pending_read_blocknum += nblocks; stream->pending_read_nblocks -= nblocks; + + return true; } static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice) { while (stream->ios_in_progress < stream->max_ios && - stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + ((stream->pinned_buffers == 0 && stream->distance > 0) || + stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)) { BlockNumber blocknum; int16 buffer_index; void *per_buffer_data; - if (stream->pending_read_nblocks == io_combine_limit) + /* If have a pending read that can't be extended, start it now. */ + Assert(stream->pinned_buffers + stream->pending_read_nblocks <= + stream->max_pinned_buffers); + if (stream->pending_read_nblocks == io_combine_limit || + (stream->pinned_buffers == 0 && + stream->pending_read_nblocks == stream->max_pinned_buffers)) { read_stream_start_pending_read(stream, suppress_advice); suppress_advice = false; @@ -360,14 +409,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; - if (stream->ios_in_progress == stream->max_ios) + if (!read_stream_start_pending_read(stream, suppress_advice) || + stream->ios_in_progress == stream->max_ios) { - /* And we've hit the limit. Rewind, and stop here. */ + /* And we've hit a buffer or I/O limit. Rewind and wait. */ read_stream_unget_block(stream, blocknum); return; } + + suppress_advice = false; } /* This is the start of a new pending read. */ @@ -390,6 +440,14 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) stream->distance == 0) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream, suppress_advice); + + /* + * There should always be something pinned when we leave this function, + * whether started by this call or not, unless we've hit the end of the + * stream. In the worst case we can always make progress one buffer at a + * time. + */ + Assert(stream->pinned_buffers > 0 || stream->distance == 0); } /* @@ -418,6 +476,7 @@ read_stream_begin_impl(int flags, int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; + uint32 max_possible_buffer_limit; Oid tablespace_id; /* @@ -465,12 +524,23 @@ read_stream_begin_impl(int flags, strategy_pin_limit = GetAccessStrategyPinLimit(strategy); max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers); - /* Don't allow this backend to pin more than its share of buffers. */ + /* + * Also limit our queue to the maximum number of pins we could possibly + * ever be allowed to acquire according to the buffer manager. We may not + * really be able to use them all due to other pins held by this backend, + * but we'll check that later in read_stream_start_pending_read(). + */ if (SmgrIsTemp(smgr)) - LimitAdditionalLocalPins(&max_pinned_buffers); + max_possible_buffer_limit = GetSoftLocalPinLimit(); else - LimitAdditionalPins(&max_pinned_buffers); - Assert(max_pinned_buffers > 0); + max_possible_buffer_limit = GetSoftPinLimit(); + max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit); + + /* + * The soft limit might be zero on a system configured with more + * connections than buffers. We need at least one to make progress. + */ + max_pinned_buffers = Max(1, max_pinned_buffers); /* * We need one extra entry for buffers and per-buffer data, because users @@ -530,6 +600,7 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->temporary = SmgrIsTemp(smgr); /* * Skip the initial ramp-up phase if the caller says we're going to be @@ -658,6 +729,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * arbitrary I/O entry (they're all free). We don't have to * adjust pinned_buffers because we're transferring one to caller * but pinning one more. + * + * In the fast path we don't need to check the pin limit. We're + * always allowed at least one pin so that progress can be made, + * and that's all we need here. Although two pins are momentarily + * held at the same time, the model used here is that the stream + * holds only one, and the other now belongs to the caller. */ if (likely(!StartReadBuffer(&stream->ios[0].op, &stream->buffers[oldest_buffer_index], @@ -858,6 +935,9 @@ read_stream_reset(ReadStream *stream) stream->buffered_blocknum = InvalidBlockNumber; stream->fast_path = false; + /* There is no point in reading whatever was pending. */ + stream->pending_read_nblocks = 0; + /* Unpin anything that wasn't consumed. */ while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); -- 2.39.5