Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward |
Date | |
Msg-id | 3515357.1760128017@sss.pgh.pa.us Whole thread Raw |
In response to | Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward (Dimitrios Apostolou <jimis@gmx.net>) |
Responses |
Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward
|
List | pgsql-hackers |
Dimitrios Apostolou <jimis@gmx.net> writes: > Question that remains: where is pg_dump setting this ~35B length block? I poked into that question, and found that the answer is some exceedingly brain-dead buffering logic in compress_zstd.c. It will dump its buffer during every loop iteration within _ZstdWriteCommon, no matter how much buffer space it has left; and each call to cs->writeF() produces a new "data block" in the output file. The amount of data fed to _ZstdWriteCommon per call is whatever the backend sends per "copy data" message, which is generally one table row. So if the table rows aren't too wide, or if they're highly compressible, you get these tiny data blocks. compress_lz4.c is equally broken, again buffering no bytes across calls; although liblz4 seems to do some buffering internally. I got blocks of around 300 bytes on the test case I was using. That's still ridiculous. compress_gzip.c is actually sanely implemented, and consistently produces blocks of 4096 bytes, which traces to DEFAULT_IO_BUFFER_SIZE in compress_io.h. If you choose --compress=none, you get data blocks that correspond exactly to table rows. We could imagine doing some internal buffering to amalgamate short rows into larger blocks, but I'm not entirely convinced it's worth messing with that case. The attached patch fixes the buffering logic in compress_zstd.c and compress_lz4.c. For zstd, most blocks are now 131591 bytes, which seems to be determined by ZSTD_CStreamOutSize() not by our code. For lz4, I see a range of block sizes but they're almost all around 64K. That's apparently emergent from the behavior of LZ4F_compressBound(): when told we want to supply it up to 4K at a time, it says it needs a buffer around 64K. I'm tempted to increase DEFAULT_IO_BUFFER_SIZE so that gzip also produces blocks in the vicinity of 64K, but we'd have to decouple the behavior of compress_lz4.c somehow, or it would want to produce blocks around a megabyte which might be excessive. (Or if it's not, we'd still want all these compression methods to choose similar block sizes, I'd think.) Anyway, these fixes should remove the need for pg_restore to look at quite so many places in the archive file. There may still be a need for altering the seek-versus-read behavior as you suggest, but I think we need to re-measure that tradeoff after fixing the pg_dump side. regards, tom lane diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index e2f7c468293..47ee2e4bbac 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -60,13 +60,11 @@ typedef struct LZ4State bool compressing; /* - * Used by the Compressor API to mark if the compression headers have been - * written after initialization. + * I/O buffer area. */ - bool needs_header_flush; - - size_t buflen; - char *buffer; + char *buffer; /* buffer for compressed data */ + size_t buflen; /* allocated size of buffer */ + size_t bufdata; /* amount of valid data currently in buffer */ /* * Used by the Stream API to store already uncompressed data that the @@ -76,12 +74,6 @@ typedef struct LZ4State size_t overflowlen; char *overflowbuf; - /* - * Used by both APIs to keep track of the compressed data length stored in - * the buffer. - */ - size_t compressedlen; - /* * Used by both APIs to keep track of error codes. */ @@ -103,8 +95,17 @@ LZ4State_compression_init(LZ4State *state) { size_t status; + /* + * Compute size needed for buffer, assuming we will present at most + * DEFAULT_IO_BUFFER_SIZE input bytes at a time. + */ state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); + /* + * Then double it, to ensure we're not forced to flush every time. + */ + state->buflen *= 2; + /* * LZ4F_compressBegin requires a buffer that is greater or equal to * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met. @@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state) } state->buffer = pg_malloc(state->buflen); + + /* + * Insert LZ4 header into buffer. + */ status = LZ4F_compressBegin(state->ctx, state->buffer, state->buflen, &state->prefs); @@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state) return false; } - state->compressedlen = status; + state->bufdata = status; return true; } @@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, { LZ4State *state = (LZ4State *) cs->private_data; size_t remaining = dLen; - size_t status; - size_t chunk; - - /* Write the header if not yet written. */ - if (state->needs_header_flush) - { - cs->writeF(AH, state->buffer, state->compressedlen); - state->needs_header_flush = false; - } while (remaining > 0) { + size_t chunk; + size_t required; + size_t status; - if (remaining > DEFAULT_IO_BUFFER_SIZE) - chunk = DEFAULT_IO_BUFFER_SIZE; - else - chunk = remaining; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } - remaining -= chunk; status = LZ4F_compressUpdate(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, data, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("could not compress data: %s", LZ4F_getErrorName(status)); - cs->writeF(AH, state->buffer, status); + state->bufdata += status; - data = ((char *) data) + chunk; + data = ((const char *) data) + chunk; + remaining -= chunk; } } @@ -238,29 +244,32 @@ static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) { LZ4State *state = (LZ4State *) cs->private_data; + size_t required; size_t status; /* Nothing needs to be done */ if (!state) return; - /* - * Write the header if not yet written. The caller is not required to call - * writeData if the relation does not contain any data. Thus it is - * possible to reach here without having flushed the header. Do it before - * ending the compression. - */ - if (state->needs_header_flush) - cs->writeF(AH, state->buffer, state->compressedlen); + /* We might need to flush the buffer to make room for LZ4F_compressEnd */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } status = LZ4F_compressEnd(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, NULL); if (LZ4F_isError(status)) pg_fatal("could not end compression: %s", LZ4F_getErrorName(status)); + state->bufdata += status; - cs->writeF(AH, state->buffer, status); + /* Write the final bufferload */ + cs->writeF(AH, state->buffer, state->bufdata); status = LZ4F_freeCompressionContext(state->ctx); if (LZ4F_isError(status)) @@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi pg_fatal("could not initialize LZ4 compression: %s", LZ4F_getErrorName(state->errcode)); - /* Remember that the header has not been written. */ - state->needs_header_flush = true; cs->private_data = state; } @@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) state->compressing = compressing; - /* When compressing, write LZ4 header to the output stream. */ if (state->compressing) { - if (!LZ4State_compression_init(state)) return false; - - errno = 0; - if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen) - { - errno = (errno) ? errno : ENOSPC; - return false; - } } else { @@ -573,8 +571,7 @@ static void LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - size_t status; - int remaining = size; + size_t remaining = size; /* Lazy init */ if (!LZ4Stream_init(state, size, true)) @@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) while (remaining > 0) { - int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE); + size_t chunk; + size_t required; + size_t status; - remaining -= chunk; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_fatal("error during writing: %m"); + } + state->bufdata = 0; + } - status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen, + status = LZ4F_compressUpdate(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, ptr, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("error during writing: %s", LZ4F_getErrorName(status)); - - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_fatal("error during writing: %m"); - } + state->bufdata += status; ptr = ((const char *) ptr) + chunk; + remaining -= chunk; } } @@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH) { FILE *fp; LZ4State *state = (LZ4State *) CFH->private_data; + size_t required; size_t status; int ret; @@ -683,20 +694,36 @@ LZ4Stream_close(CompressFileHandle *CFH) { if (state->compressing) { - status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL); + /* We might need to flush the buffer to make room */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); + } + state->bufdata = 0; + } + + status = LZ4F_compressEnd(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, + NULL); if (LZ4F_isError(status)) { pg_log_error("could not end compression: %s", LZ4F_getErrorName(status)); } else + state->bufdata += status; + + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_log_error("could not write to output file: %m"); - } + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); } status = LZ4F_freeCompressionContext(state->ctx); diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index e24d45e1bbe..63b1ebf8583 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -98,24 +98,22 @@ _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush) ZSTD_outBuffer *output = &zstdcs->output; /* Loop while there's any input or until flushed */ - while (input->pos != input->size || flush) + while (input->pos < input->size || flush) { size_t res; - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, flush ? ZSTD_e_end : ZSTD_e_continue); if (ZSTD_isError(res)) pg_fatal("could not compress data: %s", ZSTD_getErrorName(res)); - /* - * Extra paranoia: avoid zero-length chunks, since a zero length chunk - * is the EOF marker in the custom format. This should never happen - * but... - */ - if (output->pos > 0) + /* Dump output buffer if full, or if we're told to flush */ + if (output->pos >= output->size || flush) + { cs->writeF(AH, output->dst, output->pos); + output->pos = 0; + } if (res == 0) break; /* End of frame or all input consumed */
pgsql-hackers by date: