From 76f60b29bae4cd90515b7b942272858df35acaca Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 26 Mar 2024 13:03:06 +0200 Subject: [PATCH v9.heikki 9/9] Replace 'buffer_io_indexes' array with ref from IO to buffer. When the block pattern is sequential so that we manage to build large I/Os, this avoids having to repeatedly reset the buffer_io_indexes array elements to -1, when most of them already are -1. That could be slightly more efficient, although I didn't try to measure that and it probably doesn't make any difference in practice either way. --- src/backend/storage/aio/streaming_read.c | 91 +++++++++++------------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c index 0ed81017b78..00b92284a47 100644 --- a/src/backend/storage/aio/streaming_read.c +++ b/src/backend/storage/aio/streaming_read.c @@ -44,30 +44,32 @@ * The main data structure is a circular queue of buffers of size * max_pinned_buffers, ready to be returned by streaming_read_buffer_next(). * Each buffer also has an optional variable sized object that is passed from - * the callback to the consumer of buffers. A third array records whether - * WaitReadBuffers() must be called before returning the buffer, and if so, - * points to the relevant ReadBuffersOperation object. + * the callback to the consumer of buffers. + * + * Parallel to the queue of buffers, there is a circular queue of in-progress + * I/Os that have been started with StartReadBuffers(), and for which + * WaitReadBuffers() must be called before returning the buffer. * * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in * successive calls, then these data structures might appear as follows: * - * buffers buf/data buf/io ios + * buffers buf/data ios * - * +----+ +-----+ +---+ +--------+ - * | | | | | | +---->| 42..44 | - * +----+ +-----+ +---+ | +--------+ - * oldest_buffer_index -> | 10 | | ? | | | | +-->| 60..60 | - * +----+ +-----+ +---+ | | +--------+ - * | 42 | | ? | | 0 +--+ | | | - * +----+ +-----+ +---+ | +--------+ - * | 43 | | ? | | | | | | - * +----+ +-----+ +---+ | +--------+ - * | 44 | | ? | | | | | | - * +----+ +-----+ +---+ | +--------+ - * | 60 | | ? | | 1 +----+ - * +----+ +-----+ +---+ - * next_buffer_index -> | | | | | | - * +----+ +-----+ +---+ + * +----+ +-----+ +--------+ + * | | | | +-----+ 42..44 | <- oldest_io_index + * +----+ +-----+ | +--------+ + * oldest_buffer_index -> | 10 | | ? | | +-->| 60..60 | + * +----+ +-----+ | | +--------+ + * | 42 | | ? | <------+ | | | <- next_io_index + * +----+ +-----+ | +--------+ + * | 43 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 44 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 60 | | ? | <--------+ + * +----+ +-----+ + * next_buffer_index -> | | | | + * +----+ +-----+ * * In the example, 5 buffers are pinned, and the next buffer to be streamed to * the client is block 10. Block 10 was a hit and has no associated I/O, but @@ -94,6 +96,12 @@ #include "utils/rel.h" #include "utils/spccache.h" +typedef struct +{ + int16 buffer_index; + ReadBuffersOperation op; +} InProgressIO; + /* * Streaming read object. */ @@ -135,10 +143,10 @@ struct StreamingRead Buffer *buffers; size_t per_buffer_data_size; void *per_buffer_data; - int16 *buffer_io_indexes; /* Read operations that have been started but not waited for yet. */ - ReadBuffersOperation *ios; + InProgressIO *ios; + int16 oldest_io_index; int16 next_io_index; /* Head and tail of the circular queue of buffers. */ @@ -211,7 +219,6 @@ streaming_read_start_pending_read(StreamingRead *stream) { bool need_wait; int nblocks; - int16 io_index; int16 overflow; int flags; @@ -254,14 +261,12 @@ streaming_read_start_pending_read(StreamingRead *stream) &nblocks, stream->strategy, flags, - &stream->ios[stream->next_io_index]); + &stream->ios[stream->next_io_index].op); stream->pinned_buffers += nblocks; /* Remember whether we need to wait before returning this buffer. */ if (!need_wait) { - io_index = -1; - /* Look-ahead distance decays, no I/O necessary (behavior A). */ if (stream->distance > 1) stream->distance--; @@ -272,7 +277,7 @@ streaming_read_start_pending_read(StreamingRead *stream) * Remember to call WaitReadBuffers() before returning head buffer. * Look-ahead distance will be adjusted after waiting. */ - io_index = stream->next_io_index; + stream->ios[stream->next_io_index].buffer_index = stream->next_buffer_index; if (++stream->next_io_index == stream->max_ios) stream->next_io_index = 0; @@ -280,9 +285,6 @@ streaming_read_start_pending_read(StreamingRead *stream) stream->ios_in_progress++; } - /* Set up the pointer to the I/O for the head buffer, if there is one. */ - stream->buffer_io_indexes[stream->next_buffer_index] = io_index; - /* * We gave a contiguous range of buffer space to StartReadBuffers(), but * we want it to wrap around at max_pinned_buffers. Move values that @@ -296,15 +298,6 @@ streaming_read_start_pending_read(StreamingRead *stream) memmove(&stream->buffers[0], &stream->buffers[stream->max_pinned_buffers], sizeof(stream->buffers[0]) * overflow); - for (int i = 0; i < overflow; ++i) - stream->buffer_io_indexes[i] = -1; - for (int i = 1; i < nblocks - overflow; ++i) - stream->buffer_io_indexes[stream->next_buffer_index + i] = -1; - } - else - { - for (int i = 1; i < nblocks; ++i) - stream->buffer_io_indexes[stream->next_buffer_index + i] = -1; } /* @@ -528,8 +521,7 @@ streaming_read_buffer_begin(int flags, buffer_io_size - 1)); /* Space for the IOs that we might run. */ - stream->buffer_io_indexes = palloc(max_pinned_buffers * sizeof(stream->buffer_io_indexes[0])); - stream->ios = palloc(max_ios * sizeof(ReadBuffersOperation)); + stream->ios = palloc(max_ios * sizeof(InProgressIO)); return stream; } @@ -547,7 +539,6 @@ Buffer streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data) { Buffer buffer; - int16 io_index; int16 oldest_buffer_index; if (unlikely(stream->pinned_buffers == 0)) @@ -580,21 +571,23 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data) Assert(BufferIsValid(buffer)); /* Do we have to wait for an associated I/O first? */ - io_index = stream->buffer_io_indexes[oldest_buffer_index]; - Assert(io_index >= -1 && io_index < stream->max_ios); - if (io_index >= 0) + if (stream->ios_in_progress > 0 && + stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { + int16 io_index = stream->oldest_io_index; int distance; /* Sanity check that we still agree on the buffers. */ - Assert(stream->ios[io_index].buffers == &stream->buffers[oldest_buffer_index]); + Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]); - WaitReadBuffers(&stream->ios[io_index]); + WaitReadBuffers(&stream->ios[io_index].op); Assert(stream->ios_in_progress > 0); stream->ios_in_progress--; + if (++stream->oldest_io_index == stream->max_ios) + stream->oldest_io_index = 0; - if (stream->ios[io_index].flags & READ_BUFFERS_ISSUE_ADVICE) + if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) { /* Distance ramps up fast (behavior C). */ distance = stream->distance * 2; @@ -621,7 +614,6 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data) /* Advance the oldest buffer, but clobber it first for debugging. */ #ifdef CLOBBER_FREED_MEMORY stream->buffers[oldest_buffer_index] = InvalidBuffer; - stream->buffer_io_indexes[oldest_buffer_index] = -1; if (stream->per_buffer_data) wipe_mem(get_per_buffer_data(stream, oldest_buffer_index), stream->per_buffer_data_size); @@ -644,7 +636,10 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); stream->oldest_buffer_index = 0; stream->next_buffer_index = 0; + Assert(stream->next_io_index == stream->oldest_io_index); + Assert(stream->ios_in_progress == 0); stream->next_io_index = 0; + stream->oldest_io_index = 0; } /* Prepare for the next call. */ -- 2.39.2