Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward |
Date | |
Msg-id | 3698234.1760149336@sss.pgh.pa.us Whole thread Raw |
In response to | Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward (Tom Lane <tgl@sss.pgh.pa.us>) |
List | pgsql-hackers |
I wrote: > I'm tempted to increase DEFAULT_IO_BUFFER_SIZE so that gzip > also produces blocks in the vicinity of 64K, but we'd have > to decouple the behavior of compress_lz4.c somehow, or it > would want to produce blocks around a megabyte which might > be excessive. (Or if it's not, we'd still want all these > compression methods to choose similar block sizes, I'd think.) After a bit of further experimentation, here is a v2 patchset. 0001 is like my previous patch except that it also fixes Zstd_write and Zstd_close so that the "stream API" code doesn't behave differently from the older API. Also, now with draft commit message. 0002 adjusts things so that lz4 and gzip compression produce block sizes around 128K, which is what compress_zstd.c already does after 0001. While I wouldn't necessarily follow zstd's lead if it were easy to do differently, it isn't. We'd have to ignore ZSTD_CStreamOutSize() in favor of making our own buffer size choice. That seems to carry some risks of tickling bugs that upstream isn't testing for, and the value of 128K is not so far off that I care to take any such risk. This brings us to a place where all three compression modes should yield roughly-comparable data block sizes, which is a good starting point for further discussion of whether pg_restore needs seek-versus-read adjustments. regards, tom lane From 8f5ab1e5b18be54dc037a459dee5b9095166232b Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Fri, 10 Oct 2025 20:57:15 -0400 Subject: [PATCH v2 1/2] Fix poor buffering logic in pg_dump's lz4 and zstd compression code. Both of these modules dumped each bit of output that they got from the underlying compression library as a separate "data block" in the emitted archive file. In the case of zstd this'd frequently result in block sizes well under 100 bytes; lz4 is a little better but still produces blocks around 300 bytes, at least in the test case I tried. This bloats the archive file a little bit compared to larger block sizes, but the real problem is that when pg_restore has to skip each data block rather than seeking directly to some target data, tiny block sizes are enormously inefficient. Fix both modules so that they fill their allocated buffer reasonably well before dumping a data block. In the case of lz4, also delete some redundant logic that caused the lz4 frame header to be emitted as a separate data block. (That saves little, but I see no reason to expend extra code to get worse results.) I fixed the "stream API" code too. In those cases, feeding small amounts of data to fwrite() probably doesn't have any meaningful performance consequences. But it seems like a bad idea to leave the two sets of code doing the same thing in two different ways. In passing, remove unnecessary "extra paranoia" check in _ZstdWriteCommon. _CustomWriteFunc (the only possible referent of cs->writeF) already protects itself against zero-length writes, and it's really a modularity violation for _ZstdWriteCommon to know that the custom format disallows empty data blocks. Reported-by: Dimitrios Apostolou <jimis@gmx.net> Author: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us --- src/bin/pg_dump/compress_lz4.c | 167 +++++++++++++++++++------------- src/bin/pg_dump/compress_zstd.c | 37 +++---- 2 files changed, 117 insertions(+), 87 deletions(-) diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index e2f7c468293..47ee2e4bbac 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -60,13 +60,11 @@ typedef struct LZ4State bool compressing; /* - * Used by the Compressor API to mark if the compression headers have been - * written after initialization. + * I/O buffer area. */ - bool needs_header_flush; - - size_t buflen; - char *buffer; + char *buffer; /* buffer for compressed data */ + size_t buflen; /* allocated size of buffer */ + size_t bufdata; /* amount of valid data currently in buffer */ /* * Used by the Stream API to store already uncompressed data that the @@ -76,12 +74,6 @@ typedef struct LZ4State size_t overflowlen; char *overflowbuf; - /* - * Used by both APIs to keep track of the compressed data length stored in - * the buffer. - */ - size_t compressedlen; - /* * Used by both APIs to keep track of error codes. */ @@ -103,8 +95,17 @@ LZ4State_compression_init(LZ4State *state) { size_t status; + /* + * Compute size needed for buffer, assuming we will present at most + * DEFAULT_IO_BUFFER_SIZE input bytes at a time. + */ state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); + /* + * Then double it, to ensure we're not forced to flush every time. + */ + state->buflen *= 2; + /* * LZ4F_compressBegin requires a buffer that is greater or equal to * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met. @@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state) } state->buffer = pg_malloc(state->buflen); + + /* + * Insert LZ4 header into buffer. + */ status = LZ4F_compressBegin(state->ctx, state->buffer, state->buflen, &state->prefs); @@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state) return false; } - state->compressedlen = status; + state->bufdata = status; return true; } @@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, { LZ4State *state = (LZ4State *) cs->private_data; size_t remaining = dLen; - size_t status; - size_t chunk; - - /* Write the header if not yet written. */ - if (state->needs_header_flush) - { - cs->writeF(AH, state->buffer, state->compressedlen); - state->needs_header_flush = false; - } while (remaining > 0) { + size_t chunk; + size_t required; + size_t status; - if (remaining > DEFAULT_IO_BUFFER_SIZE) - chunk = DEFAULT_IO_BUFFER_SIZE; - else - chunk = remaining; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } - remaining -= chunk; status = LZ4F_compressUpdate(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, data, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("could not compress data: %s", LZ4F_getErrorName(status)); - cs->writeF(AH, state->buffer, status); + state->bufdata += status; - data = ((char *) data) + chunk; + data = ((const char *) data) + chunk; + remaining -= chunk; } } @@ -238,29 +244,32 @@ static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) { LZ4State *state = (LZ4State *) cs->private_data; + size_t required; size_t status; /* Nothing needs to be done */ if (!state) return; - /* - * Write the header if not yet written. The caller is not required to call - * writeData if the relation does not contain any data. Thus it is - * possible to reach here without having flushed the header. Do it before - * ending the compression. - */ - if (state->needs_header_flush) - cs->writeF(AH, state->buffer, state->compressedlen); + /* We might need to flush the buffer to make room for LZ4F_compressEnd */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } status = LZ4F_compressEnd(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, NULL); if (LZ4F_isError(status)) pg_fatal("could not end compression: %s", LZ4F_getErrorName(status)); + state->bufdata += status; - cs->writeF(AH, state->buffer, status); + /* Write the final bufferload */ + cs->writeF(AH, state->buffer, state->bufdata); status = LZ4F_freeCompressionContext(state->ctx); if (LZ4F_isError(status)) @@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi pg_fatal("could not initialize LZ4 compression: %s", LZ4F_getErrorName(state->errcode)); - /* Remember that the header has not been written. */ - state->needs_header_flush = true; cs->private_data = state; } @@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) state->compressing = compressing; - /* When compressing, write LZ4 header to the output stream. */ if (state->compressing) { - if (!LZ4State_compression_init(state)) return false; - - errno = 0; - if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen) - { - errno = (errno) ? errno : ENOSPC; - return false; - } } else { @@ -573,8 +571,7 @@ static void LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - size_t status; - int remaining = size; + size_t remaining = size; /* Lazy init */ if (!LZ4Stream_init(state, size, true)) @@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) while (remaining > 0) { - int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE); + size_t chunk; + size_t required; + size_t status; - remaining -= chunk; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_fatal("error during writing: %m"); + } + state->bufdata = 0; + } - status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen, + status = LZ4F_compressUpdate(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, ptr, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("error during writing: %s", LZ4F_getErrorName(status)); - - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_fatal("error during writing: %m"); - } + state->bufdata += status; ptr = ((const char *) ptr) + chunk; + remaining -= chunk; } } @@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH) { FILE *fp; LZ4State *state = (LZ4State *) CFH->private_data; + size_t required; size_t status; int ret; @@ -683,20 +694,36 @@ LZ4Stream_close(CompressFileHandle *CFH) { if (state->compressing) { - status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL); + /* We might need to flush the buffer to make room */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); + } + state->bufdata = 0; + } + + status = LZ4F_compressEnd(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, + NULL); if (LZ4F_isError(status)) { pg_log_error("could not end compression: %s", LZ4F_getErrorName(status)); } else + state->bufdata += status; + + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_log_error("could not write to output file: %m"); - } + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); } status = LZ4F_freeCompressionContext(state->ctx); diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index e24d45e1bbe..5fe2279faae 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -98,24 +98,22 @@ _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush) ZSTD_outBuffer *output = &zstdcs->output; /* Loop while there's any input or until flushed */ - while (input->pos != input->size || flush) + while (input->pos < input->size || flush) { size_t res; - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, flush ? ZSTD_e_end : ZSTD_e_continue); if (ZSTD_isError(res)) pg_fatal("could not compress data: %s", ZSTD_getErrorName(res)); - /* - * Extra paranoia: avoid zero-length chunks, since a zero length chunk - * is the EOF marker in the custom format. This should never happen - * but... - */ - if (output->pos > 0) + /* Dump output buffer if full, or if we're told to flush */ + if (output->pos >= output->size || flush) + { cs->writeF(AH, output->dst, output->pos); + output->pos = 0; + } if (res == 0) break; /* End of frame or all input consumed */ @@ -367,26 +365,31 @@ Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH) if (zstdcs->cstream == NULL) { zstdcs->output.size = ZSTD_CStreamOutSize(); - zstdcs->output.dst = pg_malloc0(zstdcs->output.size); + zstdcs->output.dst = pg_malloc(zstdcs->output.size); + zstdcs->output.pos = 0; zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec); if (zstdcs->cstream == NULL) pg_fatal("could not initialize compression library"); } /* Consume all input, to be flushed later */ - while (input->pos != input->size) + while (input->pos < input->size) { - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue); if (ZSTD_isError(res)) pg_fatal("could not write to file: %s", ZSTD_getErrorName(res)); - errno = 0; - cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); - if (cnt != output->pos) + /* Dump output buffer if full */ + if (output->pos >= output->size) { - errno = (errno) ? errno : ENOSPC; - pg_fatal("could not write to file: %m"); + errno = 0; + cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); + if (cnt != output->pos) + { + errno = (errno) ? errno : ENOSPC; + pg_fatal("could not write to file: %m"); + } + output->pos = 0; } } } @@ -448,7 +451,6 @@ Zstd_close(CompressFileHandle *CFH) /* Loop until the compression buffers are fully consumed */ for (;;) { - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end); if (ZSTD_isError(res)) { @@ -466,6 +468,7 @@ Zstd_close(CompressFileHandle *CFH) success = false; break; } + output->pos = 0; if (res == 0) break; /* End of frame */ -- 2.43.7 From 865a0ccacf8ec065426b1e2823aba9d3cc2c1caf Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Fri, 10 Oct 2025 22:08:13 -0400 Subject: [PATCH v2 2/2] Try to align the block sizes of pg_dump's various compression modes. (This is more of a straw man for discussion than a finished patch.) After the previous patch, compress_zstd.c tends to produce data block sizes around 128K, and we don't really have any control over that unless we want to overrule ZSTD_CStreamOutSize(). Which seems like a bad idea. But let's try to align the other compression modes to produce block sizes roughly comparable to that, so that pg_restore's skip-data performance isn't enormously different for different modes. gzip compression can be brought in line simply by setting DEFAULT_IO_BUFFER_SIZE = 128K, which this patch does. That increases some unrelated buffer sizes, but none of them seem problematic for modern platforms. lz4's idea of appropriate block size is highly nonlinear: if we just increase DEFAULT_IO_BUFFER_SIZE then the output blocks end up around 200K. I found that adjusting the slop factor in LZ4State_compression_init was a not-too-ugly way of bringing that number into line. With compress = none you get data blocks the same sizes as the table rows. We could avoid that by introducing an additional layer of buffering, but it's not clear to me that that's a net win, so this patch doesn't do so. Comments in compress_io.h and 002_pg_dump.pl suggest that if we increase DEFAULT_IO_BUFFER_SIZE then we need to increase the amount of data fed through the tests in order to improve coverage. I've not done that here either. In my view, the decompression side of compress_lz4.c needs to be rewritten to be simpler, rather than tested more. Author: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us --- src/bin/pg_dump/compress_io.h | 2 +- src/bin/pg_dump/compress_lz4.c | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 25a7bf0904d..53cf8c9b03b 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -24,7 +24,7 @@ * still exercise all the branches. This applies especially if the value is * increased, in which case the overflow buffer may not be needed. */ -#define DEFAULT_IO_BUFFER_SIZE 4096 +#define DEFAULT_IO_BUFFER_SIZE (128 * 1024) extern char *supports_compression(const pg_compress_specification compression_spec); diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index 47ee2e4bbac..c9ea895c137 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -102,9 +102,14 @@ LZ4State_compression_init(LZ4State *state) state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); /* - * Then double it, to ensure we're not forced to flush every time. + * Add some slop to ensure we're not forced to flush every time. + * + * The present slop factor of 50% is chosen so that the typical output + * block size is about 128K when DEFAULT_IO_BUFFER_SIZE = 128K. We might + * need a different slop factor to maintain that equivalence if + * DEFAULT_IO_BUFFER_SIZE is changed dramatically. */ - state->buflen *= 2; + state->buflen += state->buflen / 2; /* * LZ4F_compressBegin requires a buffer that is greater or equal to -- 2.43.7
pgsql-hackers by date: