From 78a9008b50d5398cce23cac695da884c41d0d720 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Tue, 27 Feb 2024 00:01:42 +1300 Subject: [PATCH v6 4/6] Provide API for streaming relation data. Introduce an abstraction where relation data can be accessed as a stream of buffers, with an implementation that is more efficient than the equivalent sequence of ReadBuffer() calls. Client code supplies a callback that can say which block number is wanted next, and then consumes individual buffers one at a time from the stream. This division allows read_stream.c to build up large calls to StartReadBuffers() up to io_combine_limit, and issue fadvise() advice ahead of time in a systematic way when random access is detected. This API is based on an idea from Andres Freund to pave the way for asynchronous I/O in future work as required to support direct I/O. The goal is to have an abstraction that insulates client code from future changes to the I/O subsystem. An extended API may be necessary in future for more complicated cases (for example recovery, whose LsnReadQueue device in xlogprefetcher.c is a distant cousin of this code that should eventually be replaced by it), but this basic API is sufficient for many common usage patterns involving predictable access to a single relation fork. Author: Thomas Munro Author: Heikki Linnakangas (contributions) Suggested-by: Andres Freund Reviewed-by: Heikki Linnakangas Reviewed-by: Melanie Plageman Reviewed-by: Nazir Bilal Yavuz Reviewed-by: Andres Freund Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com --- src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/read_stream.c | 733 ++++++++++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 21 +- src/backend/storage/meson.build | 1 + src/include/storage/read_stream.h | 62 +++ src/tools/pgindent/typedefs.list | 2 + 8 files changed, 829 insertions(+), 11 deletions(-) create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/read_stream.c create mode 100644 src/include/storage/read_stream.h diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca20..eec03f6f2b4 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 00000000000..2f29a9ec4d1 --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + read_stream.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 00000000000..10e1aa3b20b --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +backend_sources += files( + 'read_stream.c', +) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c new file mode 100644 index 00000000000..4e293e0df65 --- /dev/null +++ b/src/backend/storage/aio/read_stream.c @@ -0,0 +1,733 @@ +/*------------------------------------------------------------------------- + * + * read_stream.c + * Mechanism for accessing buffered relation data with look-ahead + * + * Code that needs to access relation data typically pins blocks one at a + * time, often in a predictable order that might be sequential or data-driven. + * Calling the simple ReadBuffer() function for each block is inefficient, + * because blocks that are not yet in the buffer pool require I/O operations + * that are small and might stall waiting for storage. This mechanism looks + * into the future and calls StartReadBuffers() and WaitReadBuffers() to read + * neighboring blocks together and ahead of time, with an adaptive look-ahead + * distance. + * + * A user-provided callback generates a stream of block numbers that is used + * to form reads of up to io_combine_limit, by attempting to merge them with a + * pending read. When that isn't possible, the existing pending read is sent + * to StartReadBuffers() so that a new one can begin to form. + * + * The algorithm for controlling the look-ahead distance tries to classify the + * stream into three ideal behaviors: + * + * A) No I/O is necessary, because the requested blocks are fully cached + * already. There is no benefit to looking ahead more than one block, so + * distance is 1. This is the default initial assumption. + * + * B) I/O is necessary, but fadvise is undesirable because the access is + * sequential, or impossible because direct I/O is enabled or the system + * doesn't support advice. There is no benefit in looking ahead more than + * io_combine_limit, because in this case only goal is larger read system + * calls. Looking further ahead would pin many buffers and perform + * speculative work looking ahead for no benefit. + * + * C) I/O is necesssary, it appears random, and this system supports fadvise. + * We'll look further ahead in order to reach the configured level of I/O + * concurrency. + * + * The distance increases rapidly and decays slowly, so that it moves towards + * those levels as different I/O patterns are discovered. For example, a + * sequential scan of fully cached data doesn't bother looking ahead, but a + * sequential scan that hits a region of uncached blocks will start issuing + * increasingly wide read calls until it plateaus at io_combine_limit. + * + * The main data structure is a circular queue of buffers of size + * max_pinned_buffers plus some extra space for technical reasons, ready to be + * returned by read_stream_next_buffer(). Each buffer also has an optional + * variable sized object that is passed from the callback to the consumer of + * buffers. + * + * Parallel to the queue of buffers, there is a circular queue of in-progress + * I/Os that have been started with StartReadBuffers(), and for which + * WaitReadBuffers() must be called before returning the buffer. + * + * For example, if the callback return block numbers 10, 42, 43, 60 in + * successive calls, then these data structures might appear as follows: + * + * buffers buf/data ios + * + * +----+ +-----+ +--------+ + * | | | | +----+ 42..44 | <- oldest_io_index + * +----+ +-----+ | +--------+ + * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 | + * +----+ +-----+ | | +--------+ + * | 42 | | ? |<-+ | | | <- next_io_index + * +----+ +-----+ | +--------+ + * | 43 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 44 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 60 | | ? |<---+ + * +----+ +-----+ + * next_buffer_index -> | | | | + * +----+ +-----+ + * + * In the example, 5 buffers are pinned, and the next buffer to be streamed to + * the client is block 10. Block 10 was a hit and has no associated I/O, but + * the range 42..44 requires an I/O wait before its buffers are returned, as + * does block 60. + * + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/read_stream.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_tablespace.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/smgr.h" +#include "storage/read_stream.h" +#include "utils/memdebug.h" +#include "utils/rel.h" +#include "utils/spccache.h" + +typedef struct InProgressIO +{ + int16 buffer_index; + ReadBuffersOperation op; +} InProgressIO; + +/* + * State for managing a stream of reads. + */ +struct ReadStream +{ + int16 max_ios; + int16 ios_in_progress; + int16 queue_size; + int16 max_pinned_buffers; + int16 pinned_buffers; + int16 distance; + bool advice_enabled; + + /* + * Sometimes we need to be able to 'unget' a block number to resolve a + * flow control problem when I/Os are split. + */ + BlockNumber unget_blocknum; + bool have_unget_blocknum; + + /* + * The callback that will tell us which block numbers to read, and an + * opaque pointer that will be pass to it for its own purposes. + */ + ReadStreamBlockNumberCB callback; + void *callback_private_data; + + /* Next expected block, for detecting sequential access. */ + BlockNumber seq_blocknum; + + /* The read operation we are currently preparing. */ + BlockNumber pending_read_blocknum; + int16 pending_read_nblocks; + + /* Space for buffers and optional per-buffer private data. */ + size_t per_buffer_data_size; + void *per_buffer_data; + + /* Read operations that have been started but not waited for yet. */ + InProgressIO *ios; + int16 oldest_io_index; + int16 next_io_index; + + /* Circular queue of buffers. */ + int16 oldest_buffer_index; /* Next pinned buffer to return */ + int16 next_buffer_index; /* Index of next buffer to pin */ + Buffer buffers[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * Return a pointer to the per-buffer data by index. + */ +static inline void * +get_per_buffer_data(ReadStream *stream, int16 buffer_index) +{ + return (char *) stream->per_buffer_data + + stream->per_buffer_data_size * buffer_index; +} + +/* + * Ask the callback which block it would like us to read next, with a small + * buffer in front to allow streaming_unget_block() to work. + */ +static inline BlockNumber +read_stream_get_block(ReadStream *stream, void *per_buffer_data) +{ + if (!stream->have_unget_blocknum) + return stream->callback(stream, + stream->callback_private_data, + per_buffer_data); + + /* + * You can only unget one block, and next_buffer_index can't change across + * a get, unget, get sequence, so the callback's per_buffer_data, if any, + * is still present in the correct slot. We just have to return the + * previous block number. + */ + stream->have_unget_blocknum = false; + return stream->unget_blocknum; +} + +/* + * In order to deal with short reads in StartReadBuffers(), we sometimes need + * to defer handling of a block until later. + */ +static inline void +read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) +{ + Assert(!stream->have_unget_blocknum); + stream->have_unget_blocknum = true; + stream->unget_blocknum = blocknum; +} + +static void +read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +{ + bool need_wait; + int nblocks; + int flags; + int16 io_index; + int16 overflow; + int16 buffer_index; + + /* This should only be called with a pending read. */ + Assert(stream->pending_read_nblocks > 0); + Assert(stream->pending_read_nblocks <= io_combine_limit); + + /* We had better not exceed the pin limit by starting this read. */ + Assert(stream->pinned_buffers + stream->pending_read_nblocks <= + stream->max_pinned_buffers); + + /* We had better not be overwriting an existing pinned buffer. */ + if (stream->pinned_buffers > 0) + Assert(stream->next_buffer_index != stream->oldest_buffer_index); + else + Assert(stream->next_buffer_index == stream->oldest_buffer_index); + + /* + * If advice hasn't been suppressed, this system supports it, and this + * isn't a strictly sequential pattern, then we'll issue advice. + */ + if (!suppress_advice && + stream->advice_enabled && + stream->pending_read_blocknum != stream->seq_blocknum) + flags = READ_BUFFERS_ISSUE_ADVICE; + else + flags = 0; + + /* We say how many blocks we want to read, but may be smaller on return. */ + buffer_index = stream->next_buffer_index; + io_index = stream->next_io_index; + nblocks = stream->pending_read_nblocks; + need_wait = StartReadBuffers(&stream->ios[io_index].op, + &stream->buffers[buffer_index], + stream->pending_read_blocknum, + &nblocks, + flags); + stream->pinned_buffers += nblocks; + + /* Remember whether we need to wait before returning this buffer. */ + if (!need_wait) + { + /* Look-ahead distance decays, no I/O necessary (behavior A). */ + if (stream->distance > 1) + stream->distance--; + } + else + { + /* + * Remember to call WaitReadBuffers() before returning head buffer. + * Look-ahead distance will be adjusted after waiting. + */ + stream->ios[io_index].buffer_index = buffer_index; + if (++stream->next_io_index == stream->max_ios) + stream->next_io_index = 0; + Assert(stream->ios_in_progress < stream->max_ios); + stream->ios_in_progress++; + stream->seq_blocknum = stream->pending_read_blocknum + nblocks; + } + + /* + * We gave a contiguous range of buffer space to StartReadBuffers(), but + * we want it to wrap around at queue_size. Slide overflowing buffers to + * the front of the array. + */ + overflow = (buffer_index + nblocks) - stream->queue_size; + if (overflow > 0) + memmove(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + + /* Compute location of start of next read, without using % operator. */ + buffer_index += nblocks; + if (buffer_index >= stream->queue_size) + buffer_index -= stream->queue_size; + Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + stream->next_buffer_index = buffer_index; + + /* Adjust the pending read to cover the remaining portion, if any. */ + stream->pending_read_blocknum += nblocks; + stream->pending_read_nblocks -= nblocks; +} + +static void +read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +{ + while (stream->ios_in_progress < stream->max_ios && + stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + { + BlockNumber blocknum; + int16 buffer_index; + void *per_buffer_data; + + if (stream->pending_read_nblocks == io_combine_limit) + { + read_stream_start_pending_read(stream, suppress_advice); + suppress_advice = false; + continue; + } + + /* + * See which block the callback wants next in the stream. We need to + * compute the index of the Nth block of the pending read including + * wrap-around, but we don't want to use the expensive % operator. + */ + buffer_index = stream->next_buffer_index + stream->pending_read_nblocks; + if (buffer_index >= stream->queue_size) + buffer_index -= stream->queue_size; + Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + per_buffer_data = get_per_buffer_data(stream, buffer_index); + blocknum = read_stream_get_block(stream, per_buffer_data); + if (blocknum == InvalidBlockNumber) + { + stream->distance = 0; + break; + } + + /* Can we merge it with the pending read? */ + if (stream->pending_read_nblocks > 0 && + stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum) + { + stream->pending_read_nblocks++; + continue; + } + + /* We have to start the pending read before we can build another. */ + if (stream->pending_read_nblocks > 0) + { + read_stream_start_pending_read(stream, suppress_advice); + suppress_advice = false; + if (stream->ios_in_progress == stream->max_ios) + { + /* And we've hit the limit. Rewind, and stop here. */ + read_stream_unget_block(stream, blocknum); + return; + } + } + + /* This is the start of a new pending read. */ + stream->pending_read_blocknum = blocknum; + stream->pending_read_nblocks = 1; + } + + /* + * Normally we don't start the pending read just because we've hit a + * limit, preferring to give it another chance to grow to a larger size + * once more buffers have been consumed. However, in cases where that + * can't possibly happen, we might as well start the read immediately. + */ + if (stream->pending_read_nblocks > 0 && + (stream->distance == stream->pending_read_nblocks || + stream->distance == 0) && + stream->ios_in_progress < stream->max_ios) + read_stream_start_pending_read(stream, suppress_advice); +} + +/* + * Create a new streaming read object that can be used to perform the + * equivalent of a series of ReadBuffer() calls for one fork of one relation. + * Internally, it generates larger vectored reads where possible by looking + * ahead. The callback should return block numbers or InvalidBlockNumber to + * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also + * write extra data for each block into the space provided to it. It will + * also receive callback_private_data for its own purposes. + */ +ReadStream * +read_stream_begin_relation(int flags, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + ReadStreamBlockNumberCB callback, + void *callback_private_data, + size_t per_buffer_data_size) +{ + ReadStream *stream; + size_t size; + int16 queue_size; + int16 max_ios; + uint32 max_pinned_buffers; + Oid tablespace_id; + + /* Make sure our bmr's smgr and persistent are populated. */ + if (bmr.smgr == NULL) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + + /* + * Decide how many I/Os we will allow to run at the same time. That + * currently means advice to the kernel to tell it that we will soon read. + * This number also affects how far we look ahead for opportunities to + * start more I/Os. + */ + tablespace_id = bmr.smgr->smgr_rlocator.locator.spcOid; + if (!OidIsValid(MyDatabaseId) || + (bmr.rel && IsCatalogRelation(bmr.rel)) || + IsCatalogRelationOid(bmr.smgr->smgr_rlocator.locator.relNumber)) + { + /* + * Avoid circularity while trying to look up tablespace settings or + * before spccache.c is ready. + */ + max_ios = effective_io_concurrency; + } + else if (flags & READ_STREAM_MAINTENANCE) + max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id); + else + max_ios = get_tablespace_io_concurrency(tablespace_id); + max_ios = Min(max_ios, PG_INT16_MAX); + + /* + * Choose the maximum number of buffers we're prepared to pin. We try to + * pin fewer if we can, though. We clamp it to at least io_combine_limit + * so that we can have a chance to build up a full io_combine_limit sized + * read, even when max_ios is zero. Be careful not to allow int16 to + * overflow (even though that's not possible with the current GUC range + * limits), allowing also for the spare entry and the overflow space. + */ + max_pinned_buffers = Max(max_ios * 4, io_combine_limit); + max_pinned_buffers = Min(max_pinned_buffers, + PG_INT16_MAX - io_combine_limit - 1); + + /* Don't allow this backend to pin more than its share of buffers. */ + if (SmgrIsTemp(bmr.smgr)) + LimitAdditionalLocalPins(&max_pinned_buffers); + else + LimitAdditionalPins(&max_pinned_buffers); + Assert(max_pinned_buffers > 0); + + /* + * We need one extra entry for buffers and per-buffer data, because users + * of per-buffer data have access to the object until the next call to + * read_stream_next_buffer(), so we need a gap between the head and tail + * of the queue so that we don't clobber it. + */ + queue_size = max_pinned_buffers + 1; + + /* + * Allocate the object, the buffers, the ios and per_data_data space in + * one big chunk. Though we have queue_size buffers, we want to be able + * to assume that all the buffers for a single read are contiguous (i.e. + * don't wrap around halfway through), so we allow temporary overflows of + * up to the maximum possible read size by allocating an extra + * io_combine_limit - 1 elements. + */ + size = offsetof(ReadStream, buffers); + size += sizeof(Buffer) * (queue_size + io_combine_limit - 1); + size += sizeof(InProgressIO) * Max(1, max_ios); + size += per_buffer_data_size * queue_size; + size += MAXIMUM_ALIGNOF * 2; + stream = (ReadStream *) palloc(size); + memset(stream, 0, offsetof(ReadStream, buffers)); + stream->ios = (InProgressIO *) + MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]); + if (per_buffer_data_size > 0) + stream->per_buffer_data = (void *) + MAXALIGN(&stream->ios[Max(1, max_ios)]); + +#ifdef USE_PREFETCH + + /* + * This system supports prefetching advice. We can use it as long as + * direct I/O isn't enabled, the caller hasn't promised sequential access + * (overriding our detection heuristics), and max_ios hasn't been set to + * zero. + */ + if ((io_direct_flags & IO_DIRECT_DATA) == 0 && + (flags & READ_STREAM_SEQUENTIAL) == 0 && + max_ios > 0) + stream->advice_enabled = true; +#endif + + /* + * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled + * above. If we had real asynchronous I/O we might need a slightly + * different definition. + */ + if (max_ios == 0) + max_ios = 1; + + stream->max_ios = max_ios; + stream->per_buffer_data_size = per_buffer_data_size; + stream->max_pinned_buffers = max_pinned_buffers; + stream->queue_size = queue_size; + + if (!bmr.smgr) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + stream->callback = callback; + stream->callback_private_data = callback_private_data; + + /* + * Skip the initial ramp-up phase if the caller says we're going to be + * reading the whole relation. This way we start out assuming we'll be + * doing full io_combine_limit sized reads (behavior B). + */ + if (flags & READ_STREAM_FULL) + stream->distance = Min(max_pinned_buffers, io_combine_limit); + else + stream->distance = 1; + + /* + * Since we always currently always access the same relation, we can + * initialize parts of the ReadBuffersOperation objects and leave them + * that way, to avoid wasting CPU cycles writing to them for each read. + */ + for (int i = 0; i < max_ios; ++i) + { + stream->ios[i].op.bmr = bmr; + stream->ios[i].op.forknum = forknum; + stream->ios[i].op.strategy = strategy; + } + + return stream; +} + +/* + * Pull one pinned buffer out of a stream created with + * read_stream_begin_buffered(). Each call returns successive blocks in the + * order specified by the callback. If per_buffer_data_size was set to a + * non-zero size, *per_buffer_data receives a pointer to the extra per-buffer + * data that the callback had a chance to populate, which remains valid until + * the next call to read_stream_next_buffer(). When the stream runs out of + * data, InvalidBuffer is returned. The caller may decide to end the stream + * early at any time by calling read_stream_end_buffered(). + */ +Buffer +read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) +{ + Buffer buffer; + int16 oldest_buffer_index; + + /* + * A fast path for all-cached scans (behavior A). This is the same as the + * usual algorithm, but it is specialized for no I/O and no per-buffer + * data, so we can skip the queue management code, stay in the same buffer + * slot and use singular StartReadBuffer(). + */ + if (likely(per_buffer_data == NULL && + stream->ios_in_progress == 0 && + stream->pinned_buffers == 1 && + stream->distance == 1)) + { + BlockNumber next_blocknum; + + /* + * We have a pinned buffer that we need to serve up, but we also want + * to probe the next one before we return, just in case we need to + * start an I/O. We can re-use the same buffer slot, and an arbitrary + * I/O slot since they're all free. + */ + oldest_buffer_index = stream->oldest_buffer_index; + Assert((oldest_buffer_index + 1) % stream->queue_size == + stream->next_buffer_index); + buffer = stream->buffers[oldest_buffer_index]; + Assert(buffer != InvalidBuffer); + Assert(stream->pending_read_nblocks <= 1); + if (unlikely(stream->pending_read_nblocks == 1)) + { + next_blocknum = stream->pending_read_blocknum; + stream->pending_read_nblocks = 0; + } + else + next_blocknum = read_stream_get_block(stream, NULL); + if (unlikely(next_blocknum == InvalidBlockNumber)) + { + /* End of stream. */ + stream->distance = 0; + stream->next_buffer_index = oldest_buffer_index; + /* Pin transferred to caller. */ + stream->pinned_buffers = 0; + return buffer; + } + /* Call the special single block version, which is marginally faster. */ + if (unlikely(StartReadBuffer(&stream->ios[0].op, + &stream->buffers[oldest_buffer_index], + next_blocknum, + stream->advice_enabled ? + READ_BUFFERS_ISSUE_ADVICE : 0))) + { + /* I/O needed. We'll take the general path next time. */ + stream->oldest_io_index = 0; + stream->next_io_index = stream->max_ios > 1 ? 1 : 0; + stream->ios_in_progress = 1; + stream->ios[0].buffer_index = oldest_buffer_index; + stream->seq_blocknum = next_blocknum + 1; + /* Increase look ahead distance (move towards behavior B/C). */ + stream->distance = Min(2, stream->max_pinned_buffers); + } + /* Pin transferred to caller, got another one, no net change. */ + Assert(stream->pinned_buffers == 1); + return buffer; + } + + if (stream->pinned_buffers == 0) + { + Assert(stream->oldest_buffer_index == stream->next_buffer_index); + + /* End of stream reached? */ + if (stream->distance == 0) + return InvalidBuffer; + + /* + * The usual order of operations is that we look ahead at the bottom + * of this function after potentially finishing an I/O and making + * space for more, but if we're just starting up we'll need to crank + * the handle to get started. + */ + read_stream_look_ahead(stream, true); + + /* End of stream reached? */ + if (stream->pinned_buffers == 0) + { + Assert(stream->distance == 0); + return InvalidBuffer; + } + } + + /* Grab the oldest pinned buffer and associated per-buffer data. */ + Assert(stream->pinned_buffers > 0); + oldest_buffer_index = stream->oldest_buffer_index; + Assert(oldest_buffer_index >= 0 && + oldest_buffer_index < stream->queue_size); + buffer = stream->buffers[oldest_buffer_index]; + if (per_buffer_data) + *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index); + + Assert(BufferIsValid(buffer)); + + /* Do we have to wait for an associated I/O first? */ + if (stream->ios_in_progress > 0 && + stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) + { + int16 io_index = stream->oldest_io_index; + int16 distance; + + /* Sanity check that we still agree on the buffers. */ + Assert(stream->ios[io_index].op.buffers == + &stream->buffers[oldest_buffer_index]); + + WaitReadBuffers(&stream->ios[io_index].op); + + Assert(stream->ios_in_progress > 0); + stream->ios_in_progress--; + if (++stream->oldest_io_index == stream->max_ios) + stream->oldest_io_index = 0; + + if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) + { + /* Distance ramps up fast (behavior C). */ + distance = stream->distance * 2; + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + } + else + { + /* No advice; move towards io_combine_limit (behavior B). */ + if (stream->distance > io_combine_limit) + { + stream->distance--; + } + else + { + distance = stream->distance * 2; + distance = Min(distance, io_combine_limit); + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + } + } + } + +#ifdef CLOBBER_FREED_MEMORY + /* Clobber old buffer and per-buffer data for debugging purposes. */ + stream->buffers[oldest_buffer_index] = InvalidBuffer; + + /* + * The caller will get access to the per-buffer data, until the next call. + * We wipe the one before, which is never occupied because queue_size + * allowed one extra element. This will hopefully trip up client code + * that is holding a dangling pointer to it. + */ + if (stream->per_buffer_data) + wipe_mem(get_per_buffer_data(stream, + oldest_buffer_index == 0 ? + stream->queue_size - 1 : + oldest_buffer_index - 1), + stream->per_buffer_data_size); +#endif + + /* Pin transferred to caller. */ + Assert(stream->pinned_buffers > 0); + stream->pinned_buffers--; + + /* Advance oldest buffer, with wrap-around. */ + stream->oldest_buffer_index++; + if (stream->oldest_buffer_index == stream->queue_size) + stream->oldest_buffer_index = 0; + + /* Prepare for the next call. */ + read_stream_look_ahead(stream, false); + + return buffer; +} + +/* + * Release stream resources. + */ +void +read_stream_end(ReadStream *stream) +{ + Buffer buffer; + + /* Stop looking ahead. */ + stream->distance = 0; + + /* Unpin anything that wasn't consumed. */ + while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + Assert(stream->pinned_buffers == 0); + Assert(stream->ios_in_progress == 0); + + /* Release memory. */ + pfree(stream); +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 8df5f3b43da..577bcf6e5dd 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1210,13 +1210,14 @@ StartReadBuffer(ReadBuffersOperation *operation, * Begin reading a range of blocks beginning at blockNum and extending for * *nblocks. On return, up to *nblocks pinned buffers holding those blocks * are written into the buffers array, and *nblocks is updated to contain the - * actual number, which may be fewer than requested. + * actual number, which may be fewer than requested. Caller sets some of the + * members of operation; see struct definition. * - * If false is returned, no I/O is necessary and WaitReadBuffers() is not - * necessary. If true is returned, one I/O has been started, and - * WaitReadBuffers() must be called with the same operation object before the - * buffers are accessed. Along with the operation object, the caller-supplied - * array of buffers must remain valid until WaitReadBuffers() is called. + * If false is returned, no I/O is necessary. If true is returned, one I/O + * has been started, and WaitReadBuffers() must be called with the same + * operation object before the buffers are accessed. Along with the operation + * object, the caller-supplied array of buffers must remain valid until + * WaitReadBuffers() is called. * * Currently the I/O is only started with optional operating system advice, * and the real I/O happens in WaitReadBuffers(). In future work, true I/O @@ -2452,7 +2453,7 @@ MarkBufferDirty(Buffer buffer) uint32 old_buf_state; if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) { @@ -4824,7 +4825,7 @@ void ReleaseBuffer(Buffer buffer) { if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) UnpinLocalBuffer(buffer); @@ -4891,7 +4892,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) Page page = BufferGetPage(buffer); if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) { @@ -5963,7 +5964,7 @@ ResOwnerReleaseBufferPin(Datum res) /* Like ReleaseBuffer, but don't call ResourceOwnerForgetBuffer */ if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) UnpinLocalBufferNoOwner(buffer); diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 40345bdca27..739d13293fb 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h new file mode 100644 index 00000000000..9e5fa2acf15 --- /dev/null +++ b/src/include/storage/read_stream.h @@ -0,0 +1,62 @@ +/*------------------------------------------------------------------------- + * + * read_stream.h + * Mechanism for accessing buffered relation data with look-ahead + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/read_stream.h + * + *------------------------------------------------------------------------- + */ +#ifndef READ_STREAM_H +#define READ_STREAM_H + +#include "storage/bufmgr.h" + +/* Default tuning, reasonable for many users. */ +#define READ_STREAM_DEFAULT 0x00 + +/* + * I/O streams that are performing maintenance work on behalf of potentially + * many users, and thus should be governed by maintenance_io_concurrency + * instead of effective_io_concurrency. For example, VACUUM or CREATE INDEX. + */ +#define READ_STREAM_MAINTENANCE 0x01 + +/* + * We usually avoid issuing prefetch advice automatically when sequential + * access is detected, but this flag explicitly disables it, for cases that + * might not be correctly detected. Explicit advice is known to perform worse + * than letting the kernel (at least Linux) detect sequential access. + */ +#define READ_STREAM_SEQUENTIAL 0x02 + +/* + * We usually ramp up from smaller reads to larger ones, to support users who + * don't know if it's worth reading lots of buffers yet. This flag disables + * that, declaring ahead of time that we'll be reading all available buffers. + */ +#define READ_STREAM_FULL 0x04 + +struct ReadStream; +typedef struct ReadStream ReadStream; + +/* Callback that returns the next block number to read. */ +typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream, + void *callback_private_data, + void *per_buffer_data); + +extern ReadStream *read_stream_begin_relation(int flags, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + ReadStreamBlockNumberCB callback, + void *callback_private_data, + size_t per_buffer_data_size); +extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private); +extern void read_stream_end(ReadStream *stream); + +#endif /* READ_STREAM_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 97edd1388e9..82fa6c12970 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1214,6 +1214,7 @@ InjectionPointCacheEntry InjectionPointEntry InjectionPointSharedState InlineCodeBlock +InProgressIO InsertStmt Instrumentation Int128AggState @@ -2292,6 +2293,7 @@ ReadExtraTocPtrType ReadFunc ReadLocalXLogPageNoWaitPrivate ReadReplicationSlotCmd +ReadStream ReassignOwnedStmt RecheckForeignScan_function RecordCacheArrayEntry -- 2.40.1