From e7b00b7ad5db7ca79010eea3abcd626bffc8e25d Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH v2.12 21/28] bufmgr: Implement AIO write support As of this commit there are no users of these AIO facilities, that'll come in later commits. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/include/storage/aio.h | 2 + src/include/storage/bufmgr.h | 2 + src/backend/storage/aio/aio_callback.c | 2 + src/backend/storage/buffer/bufmgr.c | 178 +++++++++++++++++++++++++ 4 files changed, 184 insertions(+) diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 09d6d9fe1bc..d461485e83f 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -197,8 +197,10 @@ typedef enum PgAioHandleCallbackID PGAIO_HCB_MD_WRITEV, PGAIO_HCB_SHARED_BUFFER_READV, + PGAIO_HCB_SHARED_BUFFER_WRITEV, PGAIO_HCB_LOCAL_BUFFER_READV, + PGAIO_HCB_LOCAL_BUFFER_WRITEV, } PgAioHandleCallbackID; diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 784df8b00cb..ba9bf247ddb 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -172,7 +172,9 @@ extern PGDLLIMPORT int backend_flush_after; extern PGDLLIMPORT int bgwriter_flush_after; extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_shared_buffer_writev_cb; extern const PgAioHandleCallbacks aio_local_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_local_buffer_writev_cb; /* in buf_init.c */ extern PGDLLIMPORT char *BufferBlocks; diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 12b0f1ebfa6..2a7c6f4f8ad 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -44,8 +44,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_WRITEV, aio_shared_buffer_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_WRITEV, aio_local_buffer_writev_cb), #undef CALLBACK_ENTRY }; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7f9b58003b3..d037aa76489 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5508,7 +5508,15 @@ LockBuffer(Buffer buffer, int mode) else if (mode == BUFFER_LOCK_SHARE) LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED); else if (mode == BUFFER_LOCK_EXCLUSIVE) + { + /* + * FIXME: Wait for AIO writes, otherwise there would be a risk of + * deadlock. This isn't entirely trivial to do in a race-free way, IO + * could be started between us checking whether there is IO and + * enqueueing ourselves for the lock. + */ LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE); + } else elog(ERROR, "unrecognized buffer lock mode: %d", mode); } @@ -5523,6 +5531,19 @@ ConditionalLockBuffer(Buffer buffer) { BufferDesc *buf; + /* + * FIXME: Wait for AIO writes. Some code does not deal well + * ConditionalLockBuffer() continuously failing, e.g. + * spginsert()->spgdoinsert() ends up busy-looping (easily reproducible by + * just making this function always fail and running the regression + * tests). While that code could be fixed, it'd be hard to find all + * problematic places. + * + * It would be OK to wait for the IO as waiting for IO completion does not + * need to wait for any locks that could lead to an undetected deadlock or + * such. + */ + Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) return true; /* act as though we got it */ @@ -5595,6 +5616,11 @@ LockBufferForCleanup(Buffer buffer) { uint32 buf_state; + /* + * FIXME: LockBuffer()'s handling of in-progress writes (once + * implemented) should suffice to deal with deadlock risk. + */ + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); buf_state = LockBufHdr(bufHdr); @@ -5742,6 +5768,14 @@ ConditionalLockBufferForCleanup(Buffer buffer) Assert(BufferIsValid(buffer)); + /* + * FIXME: Should wait for IO for the same reason as in + * ConditionalLockBuffer(). Needs to happen before the + * ConditionalLockBuffer() call below, as we'd never reach the + * ConditionalLockBuffer() call due the buffer pin held for the duration + * of the IO. + */ + if (BufferIsLocal(buffer)) { refcount = LocalRefCount[-buffer - 1]; @@ -6842,12 +6876,129 @@ buffer_readv_report(PgAioResult result, const PgAioTargetData *target_data, target_data->smgr.forkNum).str)); } +/* + * Helper for AIO writev completion callbacks, supporting both shared and temp + * buffers. Gets called once for each buffer in a multi-page write. + */ +static pg_attribute_always_inline PgAioResult +buffer_writev_complete_one(uint8 buf_off, Buffer buffer, uint8 flags, + bool failed, bool is_temp) +{ + BufferDesc *buf_hdr = is_temp ? + GetLocalBufferDescriptor(-buffer - 1) + : GetBufferDescriptor(buffer - 1); + PgAioResult result = {.status = PGAIO_RS_OK}; + bool clear_dirty; + uint32 set_flag_bits; + +#ifdef USE_ASSERT_CHECKING + { + uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state); + + Assert(buf_state & BM_VALID); + Assert(buf_state & BM_TAG_VALID); + /* temp buffers don't use BM_IO_IN_PROGRESS */ + if (!is_temp) + Assert(buf_state & BM_IO_IN_PROGRESS); + Assert(buf_state & BM_DIRTY); + } +#endif + + clear_dirty = failed ? false : true; + set_flag_bits = failed ? BM_IO_ERROR : 0; + + if (is_temp) + TerminateLocalBufferIO(buf_hdr, clear_dirty, set_flag_bits, false); + else + TerminateBufferIO(buf_hdr, clear_dirty, set_flag_bits, false, false); + + /* + * The initiator of IO is not managing the lock (i.e. we called + * LWLockDisown()), we are. + */ + if (!is_temp) + LWLockReleaseDisowned(BufferDescriptorGetContentLock(buf_hdr), + LW_SHARED); + + /* FIXME: tracepoint */ + + return result; +} + +/* + * Perform completion handling of a single AIO write. This write may cover + * multiple blocks / buffers. + * + * Shared between shared and local buffers, to reduce code duplication. + */ +static pg_attribute_always_inline PgAioResult +buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data, bool is_temp) +{ + PgAioResult result = prior_result; + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + uint64 *io_data; + uint8 handle_data_len; + + if (is_temp) + { + Assert(td->smgr.is_temp); + Assert(pgaio_io_get_owner(ioh) == MyProcNumber); + } + else + Assert(!td->smgr.is_temp); + + /* + * Iterate over all the buffers affected by this IO and call appropriate + * per-buffer completion function for each buffer. + */ + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++) + { + Buffer buf = io_data[buf_off]; + PgAioResult buf_result; + bool failed; + + Assert(BufferIsValid(buf)); + + /* + * If the entire failed on a lower-level, each buffer needs to be + * marked as failed. In case of a partial read, some buffers may be + * ok. + */ + failed = + prior_result.status == PGAIO_RS_ERROR + || prior_result.result <= buf_off; + + buf_result = buffer_writev_complete_one(buf_off, buf, cb_data, failed, + is_temp); + + /* + * If there wasn't any prior error and the IO for this page failed in + * some form, set the whole IO's to the page's result. + */ + if (result.status != PGAIO_RS_ERROR && buf_result.status != PGAIO_RS_OK) + { + result = buf_result; + pgaio_result_report(result, td, LOG); + } + } + + return result; +} + static void shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) { buffer_stage_common(ioh, false, false); } +static void +shared_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data) +{ + buffer_stage_common(ioh, true, false); +} + static PgAioResult shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) @@ -6855,6 +7006,13 @@ shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, return buffer_readv_complete(ioh, prior_result, cb_data, false); } +static PgAioResult +shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data) +{ + return buffer_writev_complete(ioh, prior_result, cb_data, false); +} + static void local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) { @@ -6868,6 +7026,17 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, return buffer_readv_complete(ioh, prior_result, cb_data, true); } +static void +local_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data) +{ + /* + * Currently this is unreachable as the only write support is for + * checkpointer / bgwriter, which don't deal with local buffers. + */ + elog(ERROR, "should be unreachable"); +} + + /* readv callback is passed READ_BUFFERS_* flags as callback data */ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { .stage = shared_buffer_readv_stage, @@ -6875,6 +7044,11 @@ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { .report = buffer_readv_report, }; +const PgAioHandleCallbacks aio_shared_buffer_writev_cb = { + .stage = shared_buffer_writev_stage, + .complete_shared = shared_buffer_writev_complete, +}; + /* readv callback is passed READ_BUFFERS_* flags as callback data */ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .stage = local_buffer_readv_stage, @@ -6888,3 +7062,7 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .complete_local = local_buffer_readv_complete, .report = buffer_readv_report, }; + +const PgAioHandleCallbacks aio_local_buffer_writev_cb = { + .stage = local_buffer_writev_stage, +}; -- 2.48.1.76.g4e746b1a31.dirty