From 42ba0a89260d46230ac0df791fae18bfdca0092f Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 18 Mar 2020 16:35:27 +1300 Subject: [PATCH 5/5] Prefetch referenced blocks during recovery. Introduce a new GUC max_wal_prefetch_distance. If it is set to a positive number of bytes, then read ahead in the WAL at most that distance, and initiate asynchronous reading of referenced blocks. The goal is to avoid I/O stalls and benefit from concurrent I/O. The number of concurrency asynchronous reads is capped by the existing maintenance_io_concurrency GUC. The feature is disabled by default. Reviewed-by: Tomas Vondra Reviewed-by: Alvaro Herrera Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 38 ++ doc/src/sgml/monitoring.sgml | 69 ++ doc/src/sgml/wal.sgml | 12 + src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 64 ++ src/backend/access/transam/xlogprefetcher.c | 663 ++++++++++++++++++++ src/backend/access/transam/xlogutils.c | 23 +- src/backend/catalog/system_views.sql | 11 + src/backend/replication/logical/logical.c | 2 +- src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/misc/guc.c | 38 +- src/include/access/xlog.h | 4 + src/include/access/xlogprefetcher.h | 28 + src/include/access/xlogutils.h | 20 + src/include/catalog/pg_proc.dat | 8 + src/include/utils/guc.h | 2 + src/test/regress/expected/rules.out | 8 + 18 files changed, 992 insertions(+), 4 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetcher.c create mode 100644 src/include/access/xlogprefetcher.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 672bf6f1ee..8249ec0139 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3102,6 +3102,44 @@ include_dir 'conf.d' + + max_wal_prefetch_distance (integer) + + max_wal_prefetch_distance configuration parameter + + + + + The maximum distance to look ahead in the WAL during recovery, to find + blocks to prefetch. Prefetching blocks that will soon be needed can + reduce I/O wait times. The number of concurrent prefetches is limited + by this setting as well as . + If this value is specified without units, it is taken as bytes. + The default is -1, meaning that WAL prefetching is disabled. + + + + + + wal_prefetch_fpw (boolean) + + wal_prefetch_fpw configuration parameter + + + + + Whether to prefetch blocks with full page images during recovery. + Usually this doesn't help, since such blocks will not be read. However, + on file systems with a block size larger than + PostgreSQL's, prefetching can avoid a costly + read-before-write when a blocks are later written. + This setting has no effect unless + is set to a positive number. + The default is off. + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 987580d6df..df4291092b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -320,6 +320,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + pg_stat_wal_prefetcherpg_stat_wal_prefetcher + Only one row, showing statistics about blocks prefetched during recovery. + See for details. + + + pg_stat_subscriptionpg_stat_subscription At least one row per subscription, showing information about @@ -2192,6 +2199,68 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i connected server. + + <structname>pg_stat_wal_prefetcher</structname> View + + + + Column + Type + Description + + + + + + prefetch + bigint + Number of blocks prefetched because they were not in the buffer pool + + + skip_hit + bigint + Number of blocks not prefetched because they were already in the buffer pool + + + skip_new + bigint + Number of blocks not prefetched because they were new (usually relation extension) + + + skip_fpw + bigint + Number of blocks not prefetched because a full page image was included in the WAL and was set to off + + + skip_seq + bigint + Number of blocks not prefetched because of repeated or sequential access + + + distance + integer + How far ahead of recovery the WAL prefetcher is currently reading, in bytes + + + queue_depth + integer + How many prefetches have been initiated but are not yet known to have completed + + + +
+ + + The pg_stat_wal_prefetcher view will contain only + one row. It is filled with nulls if recovery is not running or WAL + prefetching is not enabled. See + for more information. The counters in this view are reset whenever the + , + or + setting is changed and + the server configuration is reloaded. + + <structname>pg_stat_subscription</structname> View diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index bd9fae544c..9e956ad2a1 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -719,6 +719,18 @@ WAL call being logged to the server log. This option might be replaced by a more general mechanism in the future. + + + The parameter can be + used to improve I/O performance during recovery by instructing + PostgreSQL to initiate reads + of disk blocks that will soon be needed, in combination with the + parameter. The + prefetching mechanism is most likely to be effective on systems + with full_page_writes set to + off (where that is safe), and where the working + set is larger than RAM. By default, WAL prefetching is disabled. + diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..20e044c7c8 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetcher.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index abb227ce66..85f36ef6f4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -34,6 +34,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xloginsert.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -105,6 +106,8 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_wal_prefetch_distance = -1; +bool wal_prefetch_fpw = false; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -806,6 +809,7 @@ static XLogSource readSource = XLOG_FROM_ANY; */ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; +static bool reset_wal_prefetcher = false; typedef struct XLogPageReadPrivate { @@ -6213,6 +6217,7 @@ CheckRequiredParameterValues(void) } } + /* * This must be called ONCE during postmaster or standalone-backend startup */ @@ -7069,6 +7074,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetcher *prefetcher = NULL; InRedo = true; @@ -7076,6 +7082,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + /* the first time through, see if we need to enable prefetching */ + ResetWalPrefetcher(); + /* * main redo apply loop */ @@ -7105,6 +7114,31 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* + * The first time through, or if any relevant settings or the + * WAL source changes, we'll restart the prefetching machinery + * as appropriate. This is simpler than trying to handle + * various complicated state changes. + */ + if (unlikely(reset_wal_prefetcher)) + { + /* If we had one already, destroy it. */ + if (prefetcher) + { + XLogPrefetcherFree(prefetcher); + prefetcher = NULL; + } + /* If we want one, create it. */ + if (max_wal_prefetch_distance > 0) + prefetcher = XLogPrefetcherAllocate(xlogreader->ReadRecPtr, + currentSource == XLOG_FROM_STREAM); + reset_wal_prefetcher = false; + } + + /* Peform WAL prefetching, if enabled. */ + if (prefetcher) + XLogPrefetcherReadAhead(prefetcher, xlogreader->ReadRecPtr); + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7292,6 +7326,8 @@ StartupXLOG(void) /* * end of main redo apply loop */ + if (prefetcher) + XLogPrefetcherFree(prefetcher); if (reachedRecoveryTarget) { @@ -10155,6 +10191,24 @@ assign_xlog_sync_method(int new_sync_method, void *extra) } } +void +assign_max_wal_prefetch_distance(int new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + max_wal_prefetch_distance = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + +void +assign_wal_prefetch_fpw(bool new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + wal_prefetch_fpw = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + /* * Issue appropriate kind of fsync (if any) for an XLOG output file. @@ -11961,6 +12015,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and move on to the next state. */ currentSource = XLOG_FROM_STREAM; + ResetWalPrefetcher(); break; case XLOG_FROM_STREAM: @@ -12390,3 +12445,12 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Schedule a WAL prefetcher reset, on change of relevant settings. + */ +void +ResetWalPrefetcher(void) +{ + reset_wal_prefetcher = true; +} diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c new file mode 100644 index 0000000000..715552b428 --- /dev/null +++ b/src/backend/access/transam/xlogprefetcher.c @@ -0,0 +1,663 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.c + * Prefetching support for PostgreSQL write-ahead log manager + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetcher.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/fmgrprotos.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "utils/hsearch.h" + +/* + * Sample the queue depth and distance every time we replay this much WAL. + * This is used to compute avg_queue_depth and avg_distance for the log message + * that appears at the end of crash recovery. + */ +#define XLOGPREFETCHER_MONITORING_SAMPLE_STEP 32768 + +/* + * Internal state used for book-keeping. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + XLogReadLocalOptions options; + bool have_record; + bool shutdown; + int next_block_id; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + XLogRecPtr *prefetch_queue; + int prefetch_queue_size; + int prefetch_head; + int prefetch_tail; + + /* Details of last prefetch to skip repeats and seq scans. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; + + /* Counters used to compute avg_queue_depth and avg_distance. */ + double samples; + double queue_depth_sum; + double distance_sum; + XLogRecPtr next_sample_lsn; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory just for the benefit of monitoring + * functions. + */ +typedef struct XLogPrefetcherMonitoringStats +{ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 skip_hit; /* Blocks already buffered. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + pg_atomic_uint64 skip_seq; /* Sequential/repeat blocks skipped. */ + int distance; /* Number of bytes ahead in the WAL. */ + int queue_depth; /* Number of I/Os possibly in progress. */ +} XLogPrefetcherMonitoringStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); + +/* + * On modern systems this is really just *counter++. On some older systems + * there might be more to it, due to inability to read and write 64 bit values + * atomically. The counters will only be written to by one process, and there + * is no ordering requirement, so there's no point in using higher overhead + * pg_atomic_fetch_add_u64(). + */ +static inline void inc_counter(pg_atomic_uint64 *counter) +{ + pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); +} + +static XLogPrefetcherMonitoringStats *MonitoringStats; + +size_t +XLogPrefetcherShmemSize(void) +{ + return sizeof(XLogPrefetcherMonitoringStats); +} + +static void +XLogPrefetcherResetMonitoringStats(void) +{ + pg_atomic_init_u64(&MonitoringStats->prefetch, 0); + pg_atomic_init_u64(&MonitoringStats->skip_hit, 0); + pg_atomic_init_u64(&MonitoringStats->skip_new, 0); + pg_atomic_init_u64(&MonitoringStats->skip_fpw, 0); + pg_atomic_init_u64(&MonitoringStats->skip_seq, 0); + MonitoringStats->distance = -1; + MonitoringStats->queue_depth = 0; +} + +void +XLogPrefetcherShmemInit(void) +{ + bool found; + + MonitoringStats = (XLogPrefetcherMonitoringStats *) + ShmemInitStruct("XLogPrefetcherMonitoringStats", + sizeof(XLogPrefetcherMonitoringStats), + &found); + if (!found) + XLogPrefetcherResetMonitoringStats(); +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL that is ahead of the given lsn. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming) +{ + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + XLogPrefetcher *prefetcher = palloc0(sizeof(*prefetcher)); + + prefetcher->options.nowait = true; + if (streaming) + { + /* + * We're only allowed to read as far as the WAL receiver has written. + * We don't have to wait for it to be flushed, though, as recovery + * does, so that gives us a chance to get a bit further ahead. + */ + prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN; + } + else + { + /* We're allowed to read as far as we can. */ + prefetcher->options.read_upto_policy = XLRO_LSN; + prefetcher->options.lsn = (XLogRecPtr) -1; + } + prefetcher->reader = XLogReaderAllocate(wal_segment_size, + NULL, + read_local_xlog_page, + &prefetcher->options); + prefetcher->filter_table = hash_create("PrefetchFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + /* + * The size of the queue is based on the maintenance_io_concurrency + * setting. In theory we might have a separate queue for each tablespace, + * but it's not clear how that should work, so for now we'll just use the + * general GUC to rate-limit all prefetching. + */ + prefetcher->prefetch_queue_size = maintenance_io_concurrency; + prefetcher->prefetch_queue = palloc0(sizeof(XLogRecPtr) * prefetcher->prefetch_queue_size); + prefetcher->prefetch_head = prefetcher->prefetch_tail = 0; + + /* Prepare to read at the given LSN. */ + ereport(LOG, + (errmsg("WAL prefetch started at %X/%X", + (uint32) (lsn << 32), (uint32) lsn))); + XLogBeginRead(prefetcher->reader, lsn); + + XLogPrefetcherResetMonitoringStats(); + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + double avg_distance = 0; + double avg_queue_depth = 0; + + /* Log final statistics. */ + if (prefetcher->samples > 0) + { + avg_distance = prefetcher->distance_sum / prefetcher->samples; + avg_queue_depth = prefetcher->queue_depth_sum / prefetcher->samples; + } + ereport(LOG, + (errmsg("WAL prefetch finished at %X/%X; " + "prefetch = " UINT64_FORMAT ", " + "skip_hit = " UINT64_FORMAT ", " + "skip_new = " UINT64_FORMAT ", " + "skip_fpw = " UINT64_FORMAT ", " + "skip_seq = " UINT64_FORMAT ", " + "avg_distance = %f, " + "avg_queue_depth = %f", + (uint32) (prefetcher->reader->EndRecPtr << 32), + (uint32) (prefetcher->reader->EndRecPtr), + pg_atomic_read_u64(&MonitoringStats->prefetch), + pg_atomic_read_u64(&MonitoringStats->skip_hit), + pg_atomic_read_u64(&MonitoringStats->skip_new), + pg_atomic_read_u64(&MonitoringStats->skip_fpw), + pg_atomic_read_u64(&MonitoringStats->skip_seq), + avg_distance, + avg_queue_depth))); + XLogReaderFree(prefetcher->reader); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher->prefetch_queue); + pfree(prefetcher); + + XLogPrefetcherResetMonitoringStats(); +} + +/* + * Read ahead in the WAL, as far as we can within the limits set by the user. + * Begin fetching any referenced blocks that are not already in the buffer + * pool. + */ +void +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + /* If an error has occurred or we've hit the end of the WAL, do nothing. */ + if (prefetcher->shutdown) + return; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of I/Os running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return; + + /* Can we drop any filters yet, due to problem records begin replayed? */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* Main prefetch loop. */ + for (;;) + { + XLogReaderState *reader = prefetcher->reader; + char *error; + int64 distance; + + /* If we don't already have a record, then try to read one. */ + if (!prefetcher->have_record) + { + if (!XLogReadRecord(reader, &error)) + { + /* If we got an error, log it and give up. */ + if (error) + { + ereport(LOG, (errmsg("WAL prefetch error: %s", error))); + prefetcher->shutdown = true; + } + /* Otherwise, we'll try again later when more data is here. */ + return; + } + prefetcher->have_record = true; + prefetcher->next_block_id = 0; + } + + /* How far ahead of replay are we now? */ + distance = prefetcher->reader->ReadRecPtr - replaying_lsn; + + /* Update distance shown in shm. */ + MonitoringStats->distance = distance; + + /* Sample the averages so we can log them at end of recovery. */ + if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn)) + { + prefetcher->distance_sum += MonitoringStats->distance; + prefetcher->queue_depth_sum += MonitoringStats->queue_depth; + prefetcher->samples += 1.0; + prefetcher->next_sample_lsn = + replaying_lsn + XLOGPREFETCHER_MONITORING_SAMPLE_STEP; + } + + /* Are we too far ahead of replay? */ + if (distance >= max_wal_prefetch_distance) + break; + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < reader->ReadRecPtr && + XLogRecGetRmid(reader) == RM_SMGR_ID && + (XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader); + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + reader->ReadRecPtr); + } + + /* + * Scan the record for block references. We might already have been + * partway through processing this record when we hit maximum I/O + * concurrency, so start where we left off. + */ + for (int i = prefetcher->next_block_id; i <= reader->max_block_id; ++i) + { + PrefetchBufferResult prefetch; + DecodedBkpBlock *block = &reader->blocks[i]; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might thing we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it + * might still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !wal_prefetch_fpw) + { + inc_counter(&MonitoringStats->skip_fpw); + continue; + } + + /* + * If this block will initialize a new page then it's probably an + * extension. Since it might create a new segment, we can't try + * to prefetch this block until the record has been replayed, or we + * might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + reader->ReadRecPtr); + inc_counter(&MonitoringStats->skip_new); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, + block->blkno)) + { + inc_counter(&MonitoringStats->skip_new); + continue; + } + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat or sequential access, then skip it. We + * expect the kernel to detect sequential access on its own + * and do a better job than we could. + */ + if (block->blkno == prefetcher->last_blkno || + block->blkno == prefetcher->last_blkno + 1) + { + prefetcher->last_blkno = block->blkno; + inc_counter(&MonitoringStats->skip_seq); + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno); + if (BufferIsValid(prefetch.buffer)) + { + /* + * It was already cached, so do nothing. Perhaps in future we + * could remember the buffer so that recovery doesn't have to + * look it up again. + */ + inc_counter(&MonitoringStats->skip_hit); + } + else if (prefetch.initiated_io) + { + /* + * I/O has possibly been initiated (though we don't know if it + * was already cached by the kernel, so we just have to assume + * that it has due to lack of better information). Record + * this as an I/O in progress until eventually we replay this + * LSN. + */ + inc_counter(&MonitoringStats->prefetch); + XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr); + /* + * If the queue is now full, we'll have to wait before + * processing any more blocks from this record. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = i + 1; + return; + } + } + else + { + /* + * Neither cached nor initiated. The underlying segment file + * doesn't exist. Presumably it will be unlinked by a later + * WAL record. When recovery reads this block, it will use the + * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to + * do that sort of thing while merely prefetching, so let's + * just ignore references to this relation until this record is + * replayed, and let recovery create the dummy file or complain + * if something is wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + reader->ReadRecPtr); + inc_counter(&MonitoringStats->skip_new); + } + } + + /* Advance to the next record. */ + prefetcher->have_record = false; + } +} + +/* + * Expose statistics about WAL prefetching. + */ +Datum +pg_stat_get_wal_prefetcher(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_WAL_PREFETCHER_COLS 7 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_WAL_PREFETCHER_COLS]; + bool nulls[PG_STAT_GET_WAL_PREFETCHER_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (MonitoringStats->distance < 0) + { + for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i) + nulls[i] = false; + values[0] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->prefetch)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_hit)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_new)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_fpw)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_seq)); + values[5] = Int32GetDatum(MonitoringStats->distance); + values[6] = Int32GetDatum(MonitoringStats->queue_depth); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of the + * time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * I/O, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn; + prefetcher->prefetch_head %= prefetcher->prefetch_queue_size; + MonitoringStats->queue_depth++; + Assert(MonitoringStats->queue_depth <= prefetcher->prefetch_queue_size); +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when I/O really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail++; + prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size; + MonitoringStats->queue_depth--; + Assert(MonitoringStats->queue_depth >= 0); + } +} + +/* + * Check if the maximum allowed number of I/Os is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size == + prefetcher->prefetch_tail; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b217ffa52f..fad2acb514 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -827,6 +828,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + XLogReadLocalOptions *options = (XLogReadLocalOptions *) state->private_data; loc = targetPagePtr + reqLen; @@ -841,7 +843,23 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * notices recovery finishes, so we only have to maintain it for the * local process until recovery ends. */ - if (!RecoveryInProgress()) + if (options) + { + switch (options->read_upto_policy) + { + case XLRO_WALRCV_WRITTEN: + read_upto = GetWalRcvWriteRecPtr(); + break; + case XLRO_LSN: + read_upto = options->lsn; + break; + default: + read_upto = 0; + elog(ERROR, "unknown read_upto_policy value"); + break; + } + } + else if (!RecoveryInProgress()) read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); @@ -879,6 +897,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, if (loc <= read_upto) break; + if (options && options->nowait) + break; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b8a3f46912..7b27ac4805 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -811,6 +811,17 @@ CREATE VIEW pg_stat_wal_receiver AS FROM pg_stat_get_wal_receiver() s WHERE s.pid IS NOT NULL; +CREATE VIEW pg_stat_wal_prefetcher AS + SELECT + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth + FROM pg_stat_get_wal_prefetcher() s; + CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583..792d90ef4c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, NULL); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 4ceb40a856..4fc391a6e4 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -572,7 +572,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) return PrefetchSharedBuffer(reln->rd_smgr, forkNum, blockNum); } #else - PrefetchBuffer result = { InvalidBuffer, false }; + PrefetchBufferResult result = { InvalidBuffer, false }; return result; #endif /* USE_PREFETCH */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 427b0d59cd..5ca98b8886 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -21,6 +21,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/xlogprefetcher.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); + size = add_size(size, XLogPrefetcherShmemSize()); size = add_size(size, CLOGShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); @@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void) * Set up xlog, clog, and buffers */ XLOGShmemInit(); + XLogPrefetcherShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 68082315ac..a2a9f62160 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -197,6 +197,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source); static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source); static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); +static void assign_maintenance_io_concurrency(int newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra); static bool check_application_name(char **newval, void **extra, GucSource source); static void assign_application_name(const char *newval, void *extra); @@ -1241,6 +1242,18 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"wal_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch blocks that have full page images in the WAL"), + gettext_noop("On some systems, there is no benefit to prefetching pages that will be " + "entirely overwritten, but if the logical page size of the filesystem is " + "larger than PostgreSQL's, this can be beneficial. This option has no " + "effect unless max_wal_prefetch_distance is set to a positive number.") + }, + &wal_prefetch_fpw, + false, + NULL, assign_wal_prefetch_fpw, NULL + }, { {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, @@ -2627,6 +2640,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_wal_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."), + gettext_noop("Set to -1 to disable WAL prefetching."), + GUC_UNIT_BYTE + }, + &max_wal_prefetch_distance, + -1, -1, INT_MAX, + NULL, assign_max_wal_prefetch_distance, NULL + }, + { {"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the number of WAL files held for standby servers."), @@ -2900,7 +2924,8 @@ static struct config_int ConfigureNamesInt[] = 0, #endif 0, MAX_IO_CONCURRENCY, - check_maintenance_io_concurrency, NULL, NULL + check_maintenance_io_concurrency, assign_maintenance_io_concurrency, + NULL }, { @@ -11498,6 +11523,17 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source) return true; } +static void +assign_maintenance_io_concurrency(int newval, void *extra) +{ +#ifdef USE_PREFETCH + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + maintenance_io_concurrency = newval; + if (AmStartupProcess()) + ResetWalPrefetcher(); +#endif +} + static void assign_pgstat_temp_directory(const char *newval, void *extra) { diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 98b033fc20..82829d7854 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -111,6 +111,8 @@ extern int wal_keep_segments; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; +extern int max_wal_prefetch_distance; +extern bool wal_prefetch_fpw; extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; @@ -319,6 +321,8 @@ extern void SetWalWriterSleeping(bool sleeping); extern void XLogRequestWalReceiverReply(void); +extern void ResetWalPrefetcher(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h new file mode 100644 index 0000000000..585f5564a3 --- /dev/null +++ b/src/include/access/xlogprefetcher.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.h + * Declarations for the XLog prefetching facility + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetcher.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCHER_H +#define XLOGPREFETCHER_H + +#include "access/xlogdefs.h" + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + +extern XLogPrefetcher *XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); +extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, XLogRecPtr replaying_lsn); + +extern size_t XLogPrefetcherShmemSize(void); +extern void XLogPrefetcherShmemInit(void); + +#endif diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..1c8e67d74a 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,6 +47,26 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private + * data for an xlog reader, causing read_local_xlog_page to modify its + * behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* How far to read. */ + enum { + XLRO_WALRCV_WRITTEN, + XLRO_LSN + } read_upto_policy; + + /* If read_upto_policy is XLRO_LSN, the LSN. */ + XLogRecPtr lsn; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7fb574f9dc..742741afa1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6082,6 +6082,14 @@ prorettype => 'bool', proargtypes => '', prosrc => 'pg_is_wal_replay_paused' }, +{ oid => '9085', descr => 'statistics: information about WAL prefetching', + proname => 'pg_stat_get_wal_prefetcher', prorows => '1', provolatile => 'v', + proretset => 't', prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,int8,int8,int8,int8,int4,int4}', + proargmodes => '{o,o,o,o,o,o,o}', + proargnames => '{prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth}', + prosrc => 'pg_stat_get_wal_prefetcher' }, + { oid => '2621', descr => 'reload configuration files', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_reload_conf' }, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index ce93ace76c..7d076a9743 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -438,5 +438,7 @@ extern void assign_search_path(const char *newval, void *extra); /* in access/transam/xlog.c */ extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern void assign_max_wal_prefetch_distance(int new_value, void *extra); +extern void assign_wal_prefetch_fpw(bool new_value, void *extra); #endif /* GUC_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index c7304611c3..63bbb796fc 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2102,6 +2102,14 @@ pg_stat_user_tables| SELECT pg_stat_all_tables.relid, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname <> ALL (ARRAY['pg_catalog'::name, 'information_schema'::name])) AND (pg_stat_all_tables.schemaname !~ '^pg_toast'::text)); +pg_stat_wal_prefetcher| SELECT s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth + FROM pg_stat_get_wal_prefetcher() s(prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth); pg_stat_wal_receiver| SELECT s.pid, s.status, s.receive_start_lsn, -- 2.20.1