From 9f679c914cc81907ba7ae9bbbf6e08fe81c7ec2d Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 9 Mar 2024 16:54:56 +1300 Subject: [PATCH v3 2/3] Use vectored I/O for bulk writes. Zero-extend using smgrzeroextend(). Extend using smgrextendv(). Write using smgrwritev(). Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com --- src/backend/storage/smgr/bulk_write.c | 83 +++++++++++++++++++-------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c index 4a10ece4c39..2a566a80f60 100644 --- a/src/backend/storage/smgr/bulk_write.c +++ b/src/backend/storage/smgr/bulk_write.c @@ -8,7 +8,7 @@ * the regular buffer manager and the bulk loading interface! * * We bypass the buffer manager to avoid the locking overhead, and call - * smgrextend() directly. A downside is that the pages will need to be + * smgrextendv() directly. A downside is that the pages will need to be * re-read into shared buffers on first use after the build finishes. That's * usually a good tradeoff for large relations, and for small relations, the * overhead isn't very significant compared to creating the relation in the @@ -45,8 +45,6 @@ #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID -static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */ - typedef struct PendingWrite { BulkWriteBuffer buf; @@ -225,35 +223,70 @@ smgr_bulk_flush(BulkWriteState *bulkstate) for (int i = 0; i < npending; i++) { - BlockNumber blkno = pending_writes[i].blkno; - Page page = pending_writes[i].buf->data; - + Page page; + const void *pages[16]; + BlockNumber blkno; + int nblocks; + int max_nblocks; + + /* Prepare to write the first block. */ + blkno = pending_writes[i].blkno; + page = pending_writes[i].buf->data; PageSetChecksumInplace(page, blkno); + pages[0] = page; + nblocks = 1; - if (blkno >= bulkstate->pages_written) + /* Zero-extend any missing space before the first block. */ + if (blkno > bulkstate->pages_written) + { + int nzeroblocks; + + nzeroblocks = blkno - bulkstate->pages_written; + smgrzeroextend(bulkstate->smgr, bulkstate->forknum, + bulkstate->pages_written, nzeroblocks, true); + bulkstate->pages_written += nzeroblocks; + } + + if (blkno < bulkstate->pages_written) { /* - * If we have to write pages nonsequentially, fill in the space - * with zeroes until we come back and overwrite. This is not - * logically necessary on standard Unix filesystems (unwritten - * space will read as zeroes anyway), but it should help to avoid - * fragmentation. The dummy pages aren't WAL-logged though. + * We're overwriting. Clamp at the existing size, because we + * can't mix writing and extending in a single operation. */ - while (blkno > bulkstate->pages_written) - { - /* don't set checksum for all-zero page */ - smgrextend(bulkstate->smgr, bulkstate->forknum, - bulkstate->pages_written++, - &zero_buffer, - true); - } - - smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true); - bulkstate->pages_written = pending_writes[i].blkno + 1; + max_nblocks = Min(lengthof(pages), + bulkstate->pages_written - blkno); + } + else + { + /* We're extending. */ + Assert(blkno == bulkstate->pages_written); + max_nblocks = lengthof(pages); + } + + /* Find as many consecutive blocks as we can. */ + while (i + 1 < npending && + pending_writes[i + 1].blkno == blkno + nblocks && + nblocks < max_nblocks) + { + page = pending_writes[++i].buf->data; + PageSetChecksumInplace(page, pending_writes[i].blkno); + pages[nblocks++] = page; + } + + /* Extend or overwrite. */ + if (blkno == bulkstate->pages_written) + { + smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true); + bulkstate->pages_written += nblocks; } else - smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true); - pfree(page); + { + Assert(blkno + nblocks <= bulkstate->pages_written); + smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true); + } + + for (int j = 0; j < nblocks; ++j) + pfree(pending_writes[i - j].buf->data); } bulkstate->npending = 0; -- 2.39.2