Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward - Mailing list pgsql-hackers

From Tom Lane
Subject Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward
Date
Msg-id 3515357.1760128017@sss.pgh.pa.us
Whole thread Raw
In response to Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward  (Dimitrios Apostolou <jimis@gmx.net>)
Responses Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward
List pgsql-hackers
Dimitrios Apostolou <jimis@gmx.net> writes:
> Question that remains: where is pg_dump setting this ~35B length block? 

I poked into that question, and found that the answer is some
exceedingly brain-dead buffering logic in compress_zstd.c.
It will dump its buffer during every loop iteration within
_ZstdWriteCommon, no matter how much buffer space it has left;
and each call to cs->writeF() produces a new "data block"
in the output file.  The amount of data fed to _ZstdWriteCommon
per call is whatever the backend sends per "copy data" message,
which is generally one table row.  So if the table rows aren't
too wide, or if they're highly compressible, you get these
tiny data blocks.

compress_lz4.c is equally broken, again buffering no bytes across
calls; although liblz4 seems to do some buffering internally.
I got blocks of around 300 bytes on the test case I was using.
That's still ridiculous.

compress_gzip.c is actually sanely implemented, and consistently
produces blocks of 4096 bytes, which traces to DEFAULT_IO_BUFFER_SIZE
in compress_io.h.

If you choose --compress=none, you get data blocks that correspond
exactly to table rows.  We could imagine doing some internal
buffering to amalgamate short rows into larger blocks, but I'm
not entirely convinced it's worth messing with that case.

The attached patch fixes the buffering logic in compress_zstd.c
and compress_lz4.c.  For zstd, most blocks are now 131591 bytes,
which seems to be determined by ZSTD_CStreamOutSize() not by
our code.  For lz4, I see a range of block sizes but they're
almost all around 64K.  That's apparently emergent from the
behavior of LZ4F_compressBound(): when told we want to supply
it up to 4K at a time, it says it needs a buffer around 64K.

I'm tempted to increase DEFAULT_IO_BUFFER_SIZE so that gzip
also produces blocks in the vicinity of 64K, but we'd have
to decouple the behavior of compress_lz4.c somehow, or it
would want to produce blocks around a megabyte which might
be excessive.  (Or if it's not, we'd still want all these
compression methods to choose similar block sizes, I'd think.)

Anyway, these fixes should remove the need for pg_restore to look
at quite so many places in the archive file.  There may still be
a need for altering the seek-versus-read behavior as you suggest,
but I think we need to re-measure that tradeoff after fixing the
pg_dump side.

            regards, tom lane

diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index e2f7c468293..47ee2e4bbac 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -60,13 +60,11 @@ typedef struct LZ4State
     bool        compressing;

     /*
-     * Used by the Compressor API to mark if the compression headers have been
-     * written after initialization.
+     * I/O buffer area.
      */
-    bool        needs_header_flush;
-
-    size_t        buflen;
-    char       *buffer;
+    char       *buffer;            /* buffer for compressed data */
+    size_t        buflen;            /* allocated size of buffer */
+    size_t        bufdata;        /* amount of valid data currently in buffer */

     /*
      * Used by the Stream API to store already uncompressed data that the
@@ -76,12 +74,6 @@ typedef struct LZ4State
     size_t        overflowlen;
     char       *overflowbuf;

-    /*
-     * Used by both APIs to keep track of the compressed data length stored in
-     * the buffer.
-     */
-    size_t        compressedlen;
-
     /*
      * Used by both APIs to keep track of error codes.
      */
@@ -103,8 +95,17 @@ LZ4State_compression_init(LZ4State *state)
 {
     size_t        status;

+    /*
+     * Compute size needed for buffer, assuming we will present at most
+     * DEFAULT_IO_BUFFER_SIZE input bytes at a time.
+     */
     state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);

+    /*
+     * Then double it, to ensure we're not forced to flush every time.
+     */
+    state->buflen *= 2;
+
     /*
      * LZ4F_compressBegin requires a buffer that is greater or equal to
      * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
@@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state)
     }

     state->buffer = pg_malloc(state->buflen);
+
+    /*
+     * Insert LZ4 header into buffer.
+     */
     status = LZ4F_compressBegin(state->ctx,
                                 state->buffer, state->buflen,
                                 &state->prefs);
@@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state)
         return false;
     }

-    state->compressedlen = status;
+    state->bufdata = status;

     return true;
 }
@@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
 {
     LZ4State   *state = (LZ4State *) cs->private_data;
     size_t        remaining = dLen;
-    size_t        status;
-    size_t        chunk;
-
-    /* Write the header if not yet written. */
-    if (state->needs_header_flush)
-    {
-        cs->writeF(AH, state->buffer, state->compressedlen);
-        state->needs_header_flush = false;
-    }

     while (remaining > 0)
     {
+        size_t        chunk;
+        size_t        required;
+        size_t        status;

-        if (remaining > DEFAULT_IO_BUFFER_SIZE)
-            chunk = DEFAULT_IO_BUFFER_SIZE;
-        else
-            chunk = remaining;
+        /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
+        chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE);
+
+        /* If not enough space, must flush buffer */
+        required = LZ4F_compressBound(chunk, &state->prefs);
+        if (required > state->buflen - state->bufdata)
+        {
+            cs->writeF(AH, state->buffer, state->bufdata);
+            state->bufdata = 0;
+        }

-        remaining -= chunk;
         status = LZ4F_compressUpdate(state->ctx,
-                                     state->buffer, state->buflen,
+                                     state->buffer + state->bufdata,
+                                     state->buflen - state->bufdata,
                                      data, chunk, NULL);

         if (LZ4F_isError(status))
             pg_fatal("could not compress data: %s",
                      LZ4F_getErrorName(status));

-        cs->writeF(AH, state->buffer, status);
+        state->bufdata += status;

-        data = ((char *) data) + chunk;
+        data = ((const char *) data) + chunk;
+        remaining -= chunk;
     }
 }

@@ -238,29 +244,32 @@ static void
 EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
     LZ4State   *state = (LZ4State *) cs->private_data;
+    size_t        required;
     size_t        status;

     /* Nothing needs to be done */
     if (!state)
         return;

-    /*
-     * Write the header if not yet written. The caller is not required to call
-     * writeData if the relation does not contain any data. Thus it is
-     * possible to reach here without having flushed the header. Do it before
-     * ending the compression.
-     */
-    if (state->needs_header_flush)
-        cs->writeF(AH, state->buffer, state->compressedlen);
+    /* We might need to flush the buffer to make room for LZ4F_compressEnd */
+    required = LZ4F_compressBound(0, &state->prefs);
+    if (required > state->buflen - state->bufdata)
+    {
+        cs->writeF(AH, state->buffer, state->bufdata);
+        state->bufdata = 0;
+    }

     status = LZ4F_compressEnd(state->ctx,
-                              state->buffer, state->buflen,
+                              state->buffer + state->bufdata,
+                              state->buflen - state->bufdata,
                               NULL);
     if (LZ4F_isError(status))
         pg_fatal("could not end compression: %s",
                  LZ4F_getErrorName(status));
+    state->bufdata += status;

-    cs->writeF(AH, state->buffer, status);
+    /* Write the final bufferload */
+    cs->writeF(AH, state->buffer, state->bufdata);

     status = LZ4F_freeCompressionContext(state->ctx);
     if (LZ4F_isError(status))
@@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi
         pg_fatal("could not initialize LZ4 compression: %s",
                  LZ4F_getErrorName(state->errcode));

-    /* Remember that the header has not been written. */
-    state->needs_header_flush = true;
     cs->private_data = state;
 }

@@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)

     state->compressing = compressing;

-    /* When compressing, write LZ4 header to the output stream. */
     if (state->compressing)
     {
-
         if (!LZ4State_compression_init(state))
             return false;
-
-        errno = 0;
-        if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
-        {
-            errno = (errno) ? errno : ENOSPC;
-            return false;
-        }
     }
     else
     {
@@ -573,8 +571,7 @@ static void
 LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
     LZ4State   *state = (LZ4State *) CFH->private_data;
-    size_t        status;
-    int            remaining = size;
+    size_t        remaining = size;

     /* Lazy init */
     if (!LZ4Stream_init(state, size, true))
@@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)

     while (remaining > 0)
     {
-        int            chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
+        size_t        chunk;
+        size_t        required;
+        size_t        status;

-        remaining -= chunk;
+        /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
+        chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE);
+
+        /* If not enough space, must flush buffer */
+        required = LZ4F_compressBound(chunk, &state->prefs);
+        if (required > state->buflen - state->bufdata)
+        {
+            errno = 0;
+            if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
+            {
+                errno = (errno) ? errno : ENOSPC;
+                pg_fatal("error during writing: %m");
+            }
+            state->bufdata = 0;
+        }

-        status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
+        status = LZ4F_compressUpdate(state->ctx,
+                                     state->buffer + state->bufdata,
+                                     state->buflen - state->bufdata,
                                      ptr, chunk, NULL);
         if (LZ4F_isError(status))
             pg_fatal("error during writing: %s", LZ4F_getErrorName(status));
-
-        errno = 0;
-        if (fwrite(state->buffer, 1, status, state->fp) != status)
-        {
-            errno = (errno) ? errno : ENOSPC;
-            pg_fatal("error during writing: %m");
-        }
+        state->bufdata += status;

         ptr = ((const char *) ptr) + chunk;
+        remaining -= chunk;
     }
 }

@@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH)
 {
     FILE       *fp;
     LZ4State   *state = (LZ4State *) CFH->private_data;
+    size_t        required;
     size_t        status;
     int            ret;

@@ -683,20 +694,36 @@ LZ4Stream_close(CompressFileHandle *CFH)
     {
         if (state->compressing)
         {
-            status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
+            /* We might need to flush the buffer to make room */
+            required = LZ4F_compressBound(0, &state->prefs);
+            if (required > state->buflen - state->bufdata)
+            {
+                errno = 0;
+                if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
+                {
+                    errno = (errno) ? errno : ENOSPC;
+                    pg_log_error("could not write to output file: %m");
+                }
+                state->bufdata = 0;
+            }
+
+            status = LZ4F_compressEnd(state->ctx,
+                                      state->buffer + state->bufdata,
+                                      state->buflen - state->bufdata,
+                                      NULL);
             if (LZ4F_isError(status))
             {
                 pg_log_error("could not end compression: %s",
                              LZ4F_getErrorName(status));
             }
             else
+                state->bufdata += status;
+
+            errno = 0;
+            if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
             {
-                errno = 0;
-                if (fwrite(state->buffer, 1, status, state->fp) != status)
-                {
-                    errno = (errno) ? errno : ENOSPC;
-                    pg_log_error("could not write to output file: %m");
-                }
+                errno = (errno) ? errno : ENOSPC;
+                pg_log_error("could not write to output file: %m");
             }

             status = LZ4F_freeCompressionContext(state->ctx);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index e24d45e1bbe..63b1ebf8583 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -98,24 +98,22 @@ _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
     ZSTD_outBuffer *output = &zstdcs->output;

     /* Loop while there's any input or until flushed */
-    while (input->pos != input->size || flush)
+    while (input->pos < input->size || flush)
     {
         size_t        res;

-        output->pos = 0;
         res = ZSTD_compressStream2(zstdcs->cstream, output,
                                    input, flush ? ZSTD_e_end : ZSTD_e_continue);

         if (ZSTD_isError(res))
             pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));

-        /*
-         * Extra paranoia: avoid zero-length chunks, since a zero length chunk
-         * is the EOF marker in the custom format. This should never happen
-         * but...
-         */
-        if (output->pos > 0)
+        /* Dump output buffer if full, or if we're told to flush */
+        if (output->pos >= output->size || flush)
+        {
             cs->writeF(AH, output->dst, output->pos);
+            output->pos = 0;
+        }

         if (res == 0)
             break;                /* End of frame or all input consumed */

pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: another autovacuum scheduling thread
Next
From: Bryan Green
Date:
Subject: Re: Buf fix: update-po for PGXS does not work