From d9dc948108a99e8ee4052873f08b2ff10d9a0477 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH v2.11 19/27] aio: Implement smgr/md/fd write support TODO: - register_dirty_segment_aio() can error out in edge cases --- src/include/storage/fd.h | 1 + src/include/storage/md.h | 5 + src/include/storage/smgr.h | 5 + src/backend/storage/aio/aio_callback.c | 1 + src/backend/storage/file/fd.c | 28 ++++ src/backend/storage/smgr/md.c | 199 +++++++++++++++++++++++++ src/backend/storage/smgr/smgr.c | 29 ++++ 7 files changed, 268 insertions(+) diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index b77d8e5e30e..2cc7c5a4761 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -112,6 +112,7 @@ extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); +extern int FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileSync(File file, uint32 wait_event_info); extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info); diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 9d7131eff43..47ae6c36c94 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -21,6 +21,7 @@ #include "storage/sync.h" extern const PgAioHandleCallbacks aio_md_readv_cb; +extern const PgAioHandleCallbacks aio_md_writev_cb; /* md storage manager functionality */ extern void mdinit(void); @@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh, extern void mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); +extern void mdstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync); extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 856ebcda350..f00b3763ac9 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); +extern void smgrstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 3cdd2a6b195..03e44b76075 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index f573aa8e7ca..8f143c10d36 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -2348,6 +2348,34 @@ retry: return returnCode; } +int +FileStartWriteV(PgAioHandle *ioh, File file, + int iovcnt, off_t offset, + uint32 wait_event_info) +{ + int returnCode; + Vfd *vfdP; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d", + file, VfdCache[file].fileName, + (int64) offset, + iovcnt)); + + returnCode = FileAccess(file); + if (returnCode < 0) + return returnCode; + + vfdP = &VfdCache[file]; + + /* FIXME: think about / reimplement temp_file_limit */ + + pgaio_io_prep_writev(ioh, vfdP->fd, iovcnt, offset); + + return 0; +} + int FileSync(File file, uint32 wait_event_info) { diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 2218f17e7a0..9d3dec2710c 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); +static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); +static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); const PgAioHandleCallbacks aio_md_readv_cb = { .complete_shared = md_readv_complete, .report = md_readv_report, }; +const PgAioHandleCallbacks aio_md_writev_cb = { + .complete_shared = md_writev_complete, + .report = md_writev_report, +}; + static inline int _mdfd_open_flags(void) @@ -1115,6 +1122,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } } +/* + * mdstartwritev() -- Asynchronous version of mdrwritev(). + */ +void +mdstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) +{ + off_t seekpos; + MdfdVec *v; + BlockNumber nblocks_this_segment; + struct iovec *iov; + int iovcnt; + int ret; + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + nblocks_this_segment = + Min(nblocks, + RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE))); + + if (nblocks_this_segment != nblocks) + elog(ERROR, "write crossing segment boundary"); + + iovcnt = pgaio_io_get_iovec(ioh, &iov); + + Assert(nblocks <= iovcnt); + + iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment); + + Assert(iovcnt <= nblocks_this_segment); + + if (!(io_direct_flags & IO_DIRECT_DATA)) + pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED); + + pgaio_io_set_target_smgr(ioh, + reln, + forknum, + blocknum, + nblocks, + skipFsync); + pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0); + + ret = FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + if (ret != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not start writing blocks %u..%u in file \"%s\": %m", + blocknum, + blocknum + nblocks_this_segment - 1, + FilePathName(v->mdfd_vfd)))); +} + /* * mdwriteback() -- Tell the kernel to write pages back to storage. @@ -1503,6 +1568,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) } } +/* + * Like register_dirty_segment(), except for use by AIO. In the completion + * callback we don't have access to the MdfdVec (the completion callback might + * be executed in a different backend than the issuing backend), therefore we + * have to implement this slightly differently. + */ +static void +register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno) +{ + FileTag tag; + + INIT_MD_FILETAG(tag, locator, forknum, segno); + + /* + * Can't block here waiting for checkpointer to accept our sync request, + * as checkpointer might be waiting for this AIO to finish if offloaded to + * a worker. + */ + if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ )) + { + char path[MAXPGPATH]; + + ereport(DEBUG1, + (errmsg_internal("could not forward fsync request because request queue is full"))); + + /* reuse mdsyncfiletag() to avoid duplicating code */ + if (mdsyncfiletag(&tag, path)) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + path))); + } +} + /* * register_unlink_segment() -- Schedule a file to be deleted after next checkpoint */ @@ -2027,3 +2126,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel) td->smgr.nblocks * (size_t) BLCKSZ)); } } + +/* + * AIO completion callback for mdstartwritev(). + */ +static PgAioResult +md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) +{ + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + PgAioResult result = prior_result; + + if (prior_result.result < 0) + { + result.status = PGAIO_RS_ERROR; + result.id = PGAIO_HCB_MD_WRITEV; + /* For "hard" errors, track the error number in error_data */ + result.error_data = -prior_result.result; + result.result = 0; + + pgaio_result_report(result, td, LOG); + + return result; + } + + /* + * As explained above smgrstartwritev(), the smgr API operates on the + * level of blocks, rather than bytes. Convert. + */ + result.result /= BLCKSZ; + + Assert(result.result <= td->smgr.nblocks); + + if (result.result == 0) + { + /* consider 0 blocks written a failure */ + result.status = PGAIO_RS_ERROR; + result.id = PGAIO_HCB_MD_WRITEV; + result.error_data = 0; + + pgaio_result_report(result, td, LOG); + + return result; + } + + if (result.status != PGAIO_RS_ERROR && + result.result < td->smgr.nblocks) + { + /* partial writes should be retried at upper level */ + result.status = PGAIO_RS_PARTIAL; + result.id = PGAIO_HCB_MD_WRITEV; + } + + if (!td->smgr.skip_fsync) + register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum, + td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE)); + + return result; +} + +/* + * AIO error reporting callback for mdstartwritev(). + */ +static void +md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel) +{ + RelPathStr path; + + path = relpathbackend(td->smgr.rlocator, + td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER, + td->smgr.forkNum); + + if (result.error_data != 0) + { + errno = result.error_data; /* for errcode_for_file_access() */ + + ereport(elevel, + errcode_for_file_access(), + errmsg("could not write blocks %u..%u in file \"%s\": %m", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks, + path.str) + ); + } + else + { + /* + * NB: This will typically only be output in debug messages, while + * retrying a partial IO. + */ + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks - 1, + path.str, + result.result * (size_t) BLCKSZ, + td->smgr.nblocks * (size_t) BLCKSZ + ) + ); + } +} diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 545888dcdfc..aec2c0c565a 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -115,6 +115,11 @@ typedef struct f_smgr BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); + void (*smgr_startwritev) (PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); @@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = { .smgr_readv = mdreadv, .smgr_startreadv = mdstartreadv, .smgr_writev = mdwritev, + .smgr_startwritev = mdstartwritev, .smgr_writeback = mdwriteback, .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, @@ -787,6 +793,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, RESUME_INTERRUPTS(); } +/* + * smgrstartwritev() -- asynchronous version of smgrwritev() + * + * This starts an asynchronous writev IO using the IO handle `ioh`. Other than + * `ioh` all parameters are the same as smgrwritev(). + * + * Completion callbacks above smgr will be passed the result as the number of + * successfully written blocks if the write [partially] succeeds. This + * maintains the abstraction that smgr operates on the level of blocks, rather + * than bytes. + */ +void +smgrstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) +{ + HOLD_INTERRUPTS(); + smgrsw[reln->smgr_which].smgr_startwritev(ioh, + reln, forknum, blocknum, buffers, + nblocks, skipFsync); + RESUME_INTERRUPTS(); +} + /* * smgrwriteback() -- Trigger kernel writeback for the supplied range of * blocks. -- 2.48.1.76.g4e746b1a31.dirty