From 15e29110301ad77291eb0f0322229077adad69e2 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 20 Feb 2024 05:54:20 +0000 Subject: [PATCH v25 4/4] Demonstrate reading unflushed WAL directly from WAL buffers --- src/backend/access/transam/xlogreader.c | 3 +- .../read_wal_from_buffers--1.0.sql | 23 ++ .../read_wal_from_buffers.c | 266 +++++++++++++++++- .../read_wal_from_buffers/t/001_basic.pl | 37 +++ 4 files changed, 327 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index ae9904e7e4..4658a86997 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1035,7 +1035,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. */ - if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) + if (state->seg.ws_segno != 0 && + targetSegNo != state->seg.ws_segno && targetPageOff != 0) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql index 82fa097d10..72d05522fc 100644 --- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers--1.0.sql @@ -12,3 +12,26 @@ CREATE FUNCTION read_wal_from_buffers(IN lsn pg_lsn, IN bytes_to_read int, bytes_read OUT int) AS 'MODULE_PATHNAME', 'read_wal_from_buffers' LANGUAGE C STRICT; + +-- +-- get_wal_records_info_from_buffers() +-- +-- SQL function to get info of WAL records available in WAL buffers. +-- +CREATE FUNCTION get_wal_records_info_from_buffers(IN start_lsn pg_lsn, + IN end_lsn pg_lsn, + OUT start_lsn pg_lsn, + OUT end_lsn pg_lsn, + OUT prev_lsn pg_lsn, + OUT xid xid, + OUT resource_manager text, + OUT record_type text, + OUT record_length int4, + OUT main_data_length int4, + OUT fpi_length int4, + OUT description text, + OUT block_ref text +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'get_wal_records_info_from_buffers' +LANGUAGE C STRICT PARALLEL SAFE; diff --git a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c index 9df5c07b4b..ed33a14127 100644 --- a/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c +++ b/src/test/modules/read_wal_from_buffers/read_wal_from_buffers.c @@ -14,11 +14,27 @@ #include "postgres.h" #include "access/xlog.h" -#include "fmgr.h" +#include "access/xlog_internal.h" +#include "access/xlogreader.h" +#include "access/xlogrecovery.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "utils/builtins.h" #include "utils/pg_lsn.h" PG_MODULE_MAGIC; +static int read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, + char *cur_page); + +static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader); +static void GetWALRecordInfo(XLogReaderState *record, Datum *values, + bool *nulls, uint32 ncols); +static void GetWALRecordsInfo(FunctionCallInfo fcinfo, + XLogRecPtr start_lsn, + XLogRecPtr end_lsn); + /* * SQL function to read WAL from WAL buffers. Returns number of bytes read. */ @@ -52,3 +68,251 @@ read_wal_from_buffers(PG_FUNCTION_ARGS) PG_RETURN_INT32(read); } + +/* + * XLogReaderRoutine->page_read callback for reading WAL from WAL buffers. + */ +static int +read_from_wal_buffers(XLogReaderState *state, XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, + char *cur_page) +{ + XLogRecPtr read_upto, + loc; + TimeLineID tli = GetWALInsertionTimeLine(); + Size count; + Size read = 0; + + loc = targetPagePtr + reqLen; + + /* Loop waiting for xlog to be available if necessary */ + while (1) + { + read_upto = GetXLogInsertRecPtr(); + + if (loc <= read_upto) + break; + + WaitXLogInsertionsToFinish(loc); + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000L); + } + + if (targetPagePtr + XLOG_BLCKSZ <= read_upto) + { + /* + * more than one block available; read only that block, have caller + * come back if they need more. + */ + count = XLOG_BLCKSZ; + } + else if (targetPagePtr + reqLen > read_upto) + { + /* not enough data there */ + return -1; + } + else + { + /* enough bytes available to satisfy the request */ + count = read_upto - targetPagePtr; + } + + /* read WAL from WAL buffers */ + read = WALReadFromBuffers(cur_page, targetPagePtr, count, tli); + + if (read != count) + ereport(ERROR, + errmsg("could not read fully from WAL buffers; expected %lu, read %lu", + count, read)); + + return count; +} + +/* + * Get info of all WAL records between start LSN and end LSN. + * + * This function and its helpers below are similar to pg_walinspect's + * pg_get_wal_records_info() except that it will get info of WAL records + * available in WAL buffers. + */ +PG_FUNCTION_INFO_V1(get_wal_records_info_from_buffers); +Datum +get_wal_records_info_from_buffers(PG_FUNCTION_ARGS) +{ + XLogRecPtr start_lsn = PG_GETARG_LSN(0); + XLogRecPtr end_lsn = PG_GETARG_LSN(1); + + /* + * Validate start and end LSNs coming from the function inputs. + * + * Reading WAL below the first page of the first segments isn't allowed. + * This is a bootstrap WAL page and the page_read callback fails to read + * it. + */ + if (start_lsn < XLOG_BLCKSZ) + ereport(ERROR, + (errmsg("could not read WAL at LSN %X/%X", + LSN_FORMAT_ARGS(start_lsn)))); + + if (start_lsn > end_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("WAL start LSN must be less than end LSN"))); + + GetWALRecordsInfo(fcinfo, start_lsn, end_lsn); + + PG_RETURN_VOID(); +} + +/* + * Read next WAL record. + */ +static XLogRecord * +ReadNextXLogRecord(XLogReaderState *xlogreader) +{ + XLogRecord *record; + char *errormsg; + + record = XLogReadRecord(xlogreader, &errormsg); + + if (record == NULL) + { + if (errormsg) + ereport(ERROR, + errmsg("could not read WAL at %X/%X: %s", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)); + else + ereport(ERROR, + errmsg("could not read WAL at %X/%X", + LSN_FORMAT_ARGS(xlogreader->EndRecPtr))); + } + + return record; +} + +/* + * Output values that make up a row describing caller's WAL record. + */ +static void +GetWALRecordInfo(XLogReaderState *record, Datum *values, + bool *nulls, uint32 ncols) +{ + const char *record_type; + RmgrData desc; + uint32 fpi_len = 0; + StringInfoData rec_desc; + StringInfoData rec_blk_ref; + int i = 0; + + desc = GetRmgr(XLogRecGetRmid(record)); + record_type = desc.rm_identify(XLogRecGetInfo(record)); + + if (record_type == NULL) + record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK); + + initStringInfo(&rec_desc); + desc.rm_desc(&rec_desc, record); + + if (XLogRecHasAnyBlockRefs(record)) + { + initStringInfo(&rec_blk_ref); + XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len); + } + + values[i++] = LSNGetDatum(record->ReadRecPtr); + values[i++] = LSNGetDatum(record->EndRecPtr); + values[i++] = LSNGetDatum(XLogRecGetPrev(record)); + values[i++] = TransactionIdGetDatum(XLogRecGetXid(record)); + values[i++] = CStringGetTextDatum(desc.rm_name); + values[i++] = CStringGetTextDatum(record_type); + values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record)); + values[i++] = UInt32GetDatum(XLogRecGetDataLen(record)); + values[i++] = UInt32GetDatum(fpi_len); + + if (rec_desc.len > 0) + values[i++] = CStringGetTextDatum(rec_desc.data); + else + nulls[i++] = true; + + if (XLogRecHasAnyBlockRefs(record)) + values[i++] = CStringGetTextDatum(rec_blk_ref.data); + else + nulls[i++] = true; + + Assert(i == ncols); +} + +/* + * Get info of all WAL records between start LSN and end LSN. + */ +static void +GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn, + XLogRecPtr end_lsn) +{ +#define GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS 11 + XLogReaderState *xlogreader; + XLogRecPtr first_valid_record; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + MemoryContext old_cxt; + MemoryContext tmp_cxt; + + Assert(start_lsn <= end_lsn); + + InitMaterializedSRF(fcinfo, 0); + + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_from_wal_buffers, + .segment_open = NULL, + .segment_close = NULL), + NULL); + + if (xlogreader == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + + /* first find a valid recptr to start from */ + first_valid_record = XLogFindNextRecord(xlogreader, start_lsn); + + if (XLogRecPtrIsInvalid(first_valid_record)) + { + ereport(LOG, + (errmsg("could not find a valid record after %X/%X", + LSN_FORMAT_ARGS(start_lsn)))); + + return; + } + + tmp_cxt = AllocSetContextCreate(CurrentMemoryContext, + "GetWALRecordsInfo temporary cxt", + ALLOCSET_DEFAULT_SIZES); + + while (ReadNextXLogRecord(xlogreader) && + xlogreader->EndRecPtr <= end_lsn) + { + Datum values[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0}; + bool nulls[GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS] = {0}; + + /* Use the tmp context so we can clean up after each tuple is done */ + old_cxt = MemoryContextSwitchTo(tmp_cxt); + + GetWALRecordInfo(xlogreader, values, nulls, + GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, + values, nulls); + + /* clean up and switch back */ + MemoryContextSwitchTo(old_cxt); + MemoryContextReset(tmp_cxt); + + CHECK_FOR_INTERRUPTS(); + } + + MemoryContextDelete(tmp_cxt); + XLogReaderFree(xlogreader); + +#undef GET_WAL_RECORDS_INFO_FROM_BUFFERS_COLS +} diff --git a/src/test/modules/read_wal_from_buffers/t/001_basic.pl b/src/test/modules/read_wal_from_buffers/t/001_basic.pl index 2360ff1171..15ef550c8c 100644 --- a/src/test/modules/read_wal_from_buffers/t/001_basic.pl +++ b/src/test/modules/read_wal_from_buffers/t/001_basic.pl @@ -71,4 +71,41 @@ for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) } ok($result, 'waited until WAL is successfully read from WAL buffers'); +$result = 0; + +# Wait until we get info of WAL records available in WAL buffers. +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $node->safe_psql('postgres', "DROP TABLE IF EXISTS foo, bar;"); + $node->safe_psql('postgres', + "CREATE TABLE foo AS SELECT * FROM generate_series(1, 2);"); + my $start_lsn = + $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();"); + my $tbl_oid = $node->safe_psql('postgres', + "SELECT oid FROM pg_class WHERE relname = 'foo';"); + $node->safe_psql('postgres', + "INSERT INTO foo SELECT * FROM generate_series(1, 10);"); + my $end_lsn = + $node->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn();"); + $node->safe_psql('postgres', + "CREATE TABLE bar AS SELECT * FROM generate_series(1, 2);"); + + my $res = $node->safe_psql( + 'postgres', + "SELECT count(*) FROM get_wal_records_info_from_buffers('$start_lsn', '$end_lsn') + WHERE block_ref LIKE concat('%', '$tbl_oid', '%') AND + resource_manager = 'Heap' AND + record_type = 'INSERT';"); + + if ($res eq 10) + { + $result = 1; + last; + } + + usleep(100_000); +} +ok($result, + 'waited until we get info of WAL records available in WAL buffers.'); + done_testing(); -- 2.34.1