From 2f2370859b959bf48424966db30e3ba25a1ac0f0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 26 Mar 2025 14:41:48 -0400 Subject: [PATCH v2.13 08/28] squash-later: bufmgr: Implement AIO read support --- src/backend/storage/buffer/bufmgr.c | 200 ++++++++++++++++++++++------ 1 file changed, 156 insertions(+), 44 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 3cf8b0f98d2..0b96e256625 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -6327,20 +6327,75 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp) } } +/* + * Helper to encode errors for buffer_readv_complete() + * + * Errors are encoded as follows: + * - bit 0 indicates whether page was zeroed (1) or not (0) + * - next 8 bits indicate the first offset is the offset of the first page + * that failed verification in a larger IO + * - next 8 bits indicate the number of corruptions + */ +static inline void +buffer_readv_encode_error(PgAioResult *result, bool is_temp, + bool was_zeroed, uint8 first_invalid_off, + uint8 count_invalid) +{ + + uint8 shift = 0; + + StaticAssertStmt(PG_IOV_MAX <= 1 << 8, + "PG_IOV_MAX is bigger than reserved space for error data"); + + result->error_data = 0; + + result->error_data |= was_zeroed << shift; + shift += 1; + + result->error_data |= first_invalid_off << shift; + shift += 8; + + result->error_data |= count_invalid << shift; + shift += 8; + + result->id = is_temp ? PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV; + result->status = was_zeroed ? PGAIO_RS_WARNING : PGAIO_RS_ERROR; +} + +/* + * Decode readv errors as encoded by buffer_readv_encode_error(). + */ +static inline void +buffer_readv_decode_error(PgAioResult result, + bool *was_zeroed, uint8 *first_invalid_off, uint8 *count_invalid) +{ + uint32 rem_error = result.error_data; + + *was_zeroed = rem_error & 1; + rem_error >>= 1; + + *first_invalid_off = rem_error & 0xff; + rem_error >>= 8; + + *count_invalid = rem_error & 0xff; + rem_error >>= 8; +} + /* * Helper for AIO readv completion callbacks, supporting both shared and temp * buffers. Gets called once for each buffer in a multi-page read. */ -static pg_attribute_always_inline PgAioResult -buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags, - bool failed, bool is_temp) +static pg_attribute_always_inline void +buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer, + uint8 flags, bool failed, bool is_temp, + bool *failed_verification) { BufferDesc *buf_hdr = is_temp ? GetLocalBufferDescriptor(-buffer - 1) : GetBufferDescriptor(buffer - 1); BufferTag tag = buf_hdr->tag; char *bufdata = BufferGetBlock(buffer); - PgAioResult result = {.status = PGAIO_RS_OK}; uint32 set_flag_bits; /* check that the buffer is in the expected state for a read */ @@ -6357,37 +6412,45 @@ buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags, } #endif + *failed_verification = false; + /* check for garbage data */ if (!failed && !PageIsVerifiedExtended((Page) bufdata, tag.blockNum, PIV_LOG_WARNING | PIV_REPORT_STAT)) { - RelFileLocator rlocator = BufTagGetRelFileLocator(&tag); + PgAioResult result_one; - if ((flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) + *failed_verification = true; + + if (flags & READ_BUFFERS_ZERO_ON_ERROR) { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - tag.blockNum, - relpathbackend(rlocator, - is_temp ? MyProcNumber : INVALID_PROC_NUMBER, - tag.forkNum).str))); memset(bufdata, 0, BLCKSZ); } else { /* mark buffer as having failed */ failed = true; - - /* encode error for buffer_readv_report */ - result.status = PGAIO_RS_ERROR; - if (is_temp) - result.id = PGAIO_HCB_LOCAL_BUFFER_READV; - else - result.id = PGAIO_HCB_SHARED_BUFFER_READV; - result.error_data = buf_off; } + + /* + * Immediately log a message about the invalid page, but only to the + * server log. The reason to do so immediately is that this may be + * executed in a different backend than the one that originated the + * request. The reason to do so immediately is that the originator + * might not process the query result immediately (because it is busy + * doing another part of query processing) or at all (e.g. if it was + * cancelled or errored out due to another IO also failing). The + * issuer of the IO will emit an ERROR or WARNING when processing the + * IO's results + * + * To avoid duplicating the code to emit these log messages, we reuse + * buffer_readv_report(). + */ + buffer_readv_encode_error(&result_one, is_temp, + flags & READ_BUFFERS_ZERO_ON_ERROR, + buf_off, 1); + pgaio_result_report(result_one, td, LOG_SERVER_ONLY); } /* Terminate I/O and set BM_VALID. */ @@ -6412,8 +6475,6 @@ buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags, tag.relNumber, is_temp ? MyProcNumber : INVALID_PROC_NUMBER, false); - - return result; } /* @@ -6428,6 +6489,8 @@ buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, { PgAioResult result = prior_result; PgAioTargetData *td = pgaio_io_get_target_data(ioh); + uint8 first_invalid_off = 0; + uint8 invalid_count = 0; uint64 *io_data; uint8 handle_data_len; @@ -6447,35 +6510,46 @@ buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++) { Buffer buf = io_data[buf_off]; - PgAioResult buf_result; bool failed; + bool failed_verification = false; Assert(BufferIsValid(buf)); /* * If the entire I/O failed on a lower-level, each buffer needs to be - * marked as failed. In case of a partial read, some buffers may be - * ok. + * marked as failed. In case of a partial read, the first few buffers + * may be ok. */ failed = prior_result.status == PGAIO_RS_ERROR || prior_result.result <= buf_off; - buf_result = buffer_readv_complete_one(buf_off, buf, cb_data, failed, - is_temp); + buffer_readv_complete_one(td, buf_off, buf, cb_data, failed, is_temp, + &failed_verification); /* - * If there wasn't any prior error and page verification failed in - * some form, set the whole IO's result to the page's result. + * Track information about the number of errors across all pages, as + * there can be multiple pages failing verification as part of one IO. */ - if (result.status != PGAIO_RS_ERROR - && buf_result.status != PGAIO_RS_OK) + if (failed_verification) { - result = buf_result; - pgaio_result_report(result, td, LOG); + if (invalid_count++ == 0) + first_invalid_off = buf_off; } } + /* + * If the smgr read succeeded [partially] and page verification failed for + * some of the pages, adjust the IO's result state appropriately. + */ + if (prior_result.status != PGAIO_RS_ERROR && invalid_count > 0) + { + bool was_zeroed = cb_data & READ_BUFFERS_ZERO_ON_ERROR; + + buffer_readv_encode_error(&result, is_temp, was_zeroed, + first_invalid_off, invalid_count); + } + return result; } @@ -6483,28 +6557,66 @@ buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, * AIO error reporting callback for aio_shared_buffer_readv_cb and * aio_local_buffer_readv_cb. * - * Errors are encoded as follows: - * - the only error is page verification failing - * - result.error_data is the offset of the first page that failed - * verification in a larger IO + * The error is encoded / decoded in buffer_readv_encode_error() / + * buffer_readv_decode_error(). */ static void buffer_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel) { + BlockNumber blocknum = target_data->smgr.blockNum; + int nblocks = target_data->smgr.nblocks; ProcNumber errProc; + bool was_zeroed; + uint8 first_invalid_off; + uint8 invalid_count; + RelPathStr rpath; + + buffer_readv_decode_error(result, &was_zeroed, &first_invalid_off, + &invalid_count); if (target_data->smgr.is_temp) errProc = MyProcNumber; else errProc = INVALID_PROC_NUMBER; - ereport(elevel, - errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - target_data->smgr.blockNum + result.error_data, - relpathbackend(target_data->smgr.rlocator, errProc, - target_data->smgr.forkNum).str)); + rpath = relpathbackend(target_data->smgr.rlocator, errProc, + target_data->smgr.forkNum); + + if (was_zeroed) + { + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + invalid_count == 1 ? + errmsg("invalid page in block %u of relation %s; zeroing out page", + blocknum + first_invalid_off, rpath.str) : + errmsg("zeroing out %u invalid pages among blocks %u..%u of relation %s", + invalid_count, + blocknum, blocknum + nblocks - 1, rpath.str), + invalid_count > 1 ? + errdetail("Block %u held first invalid page.", + blocknum + first_invalid_off) : 0, + invalid_count > 1 ? + errhint("See server log for the other %u invalid blocks.", + invalid_count - 1) : 0); + } + else + { + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + invalid_count == 1 ? + errmsg("invalid page in block %u of relation %s", + blocknum + first_invalid_off, rpath.str) : + errmsg("%u invalid pages among blocks %u..%u of relation %s", + invalid_count, + blocknum, blocknum + nblocks - 1, rpath.str), + invalid_count > 1 ? + errdetail("Block %u held first invalid page.", + blocknum + first_invalid_off) : 0, + invalid_count > 1 ? + errhint("See server log for the other %u invalid blocks.", + invalid_count - 1) : 0); + } } static void -- 2.48.1.76.g4e746b1a31.dirty