From 857f48859aa8ebbe6daa5b80e2f51bfb96e3979c Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos Date: Tue, 29 Jun 2021 14:27:51 +0000 Subject: Teach pg_receivewal to use lz4 compression --- src/bin/pg_basebackup/pg_basebackup.c | 7 +- src/bin/pg_basebackup/pg_receivewal.c | 68 ++++++- src/bin/pg_basebackup/t/020_pg_receivewal.pl | 38 +++- src/bin/pg_basebackup/walmethods.c | 175 +++++++++++++++++-- src/bin/pg_basebackup/walmethods.h | 12 +- 5 files changed, 278 insertions(+), 22 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 16d8929b23..6b8734d8ba 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -553,10 +553,13 @@ LogStreamerMain(logstreamer_param *param) stream.replication_slot = replication_slot; if (format == 'p') - stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, + stream.walmethod = CreateWalDirectoryMethod(param->xlog, + COMPRESSION_NONE, 0, stream.do_sync); else - stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, + stream.walmethod = CreateWalTarMethod(param->xlog, + COMPRESSION_NONE /* argument is ignored */, + compresslevel, stream.do_sync); if (!ReceiveXlogStream(param->bgconn, &stream)) diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 0d15012c29..6759e3e747 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -43,6 +43,7 @@ static bool do_drop_slot = false; static bool do_sync = true; static bool synchronous = false; static char *replication_slot = NULL; +static WalCompressionProgram compression_program = COMPRESSION_NONE; static XLogRecPtr endpos = InvalidXLogRecPtr; @@ -90,7 +91,8 @@ usage(void) printf(_(" --synchronous flush write-ahead log immediately after writing\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); - printf(_(" -Z, --compress=0-9 compress logs with given compression level\n")); + printf(_(" -I, --compress-program use this program for compression\n")); + printf(_(" -Z, --compress=0-9 compress logs with given compression level (available only with compress-program=zlib)\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -429,7 +431,9 @@ StreamLog(void) stream.synchronous = synchronous; stream.do_sync = do_sync; stream.mark_done = false; - stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel, + stream.walmethod = CreateWalDirectoryMethod(basedir, + compression_program, + compresslevel, stream.do_sync); stream.partial_suffix = ".partial"; stream.replication_slot = replication_slot; @@ -482,6 +486,7 @@ main(int argc, char **argv) {"status-interval", required_argument, NULL, 's'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, + {"compress-program", required_argument, NULL, 'I'}, {"compress", required_argument, NULL, 'Z'}, /* action */ {"create-slot", no_argument, NULL, 1}, @@ -573,6 +578,21 @@ main(int argc, char **argv) case 'v': verbose++; break; + case 'I': + if (strcmp(optarg, "zlib") == 0) + { + compression_program = COMPRESSION_ZLIB; + } + else if (strcmp(optarg, "lz4") == 0) + { + compression_program = COMPRESSION_LZ4; + } + else + { + pg_log_error("invalid compress-program \"%s\"", optarg); + exit(1); + } + break; case 'Z': compresslevel = atoi(optarg); if (compresslevel < 0 || compresslevel > 9) @@ -657,14 +677,56 @@ main(int argc, char **argv) exit(1); } + if (compression_program != COMPRESSION_NONE) + { +#ifndef HAVE_LIBZ + if (compression_program == COMPRESSION_ZLIB) + { + pg_log_error("this build does not support compression via zlib"); + exit(1); + } +#endif +#ifndef HAVE_LIBLZ4 + if (compression_program == COMPRESSION_LZ4) + { + pg_log_error("this build does not support compression via lz4"); + exit(1); + } +#endif + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { - pg_log_error("this build does not support compression"); + pg_log_error("this build does not support compression via zlib"); exit(1); } #endif + if (compresslevel != 0) + { + if (compression_program == COMPRESSION_NONE) + { + compression_program = COMPRESSION_ZLIB; + } + if (compression_program != COMPRESSION_ZLIB) + { + pg_log_error("cannot use --compress when " + "--compress_program is not zlib"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + else if (compression_program == COMPRESSION_ZLIB) + { + pg_log_error("cannot use --compress_program zlib when " + "--compression is 0"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + /* * Check existence of destination folder. */ diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl index a547c97ef1..0e27cf030c 100644 --- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl +++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl @@ -5,7 +5,7 @@ use strict; use warnings; use TestLib; use PostgresNode; -use Test::More tests => 19; +use Test::More tests => 22; program_help_ok('pg_receivewal'); program_version_ok('pg_receivewal'); @@ -33,6 +33,13 @@ $primary->command_fails( $primary->command_fails( [ 'pg_receivewal', '-D', $stream_dir, '--synchronous', '--no-sync' ], 'failure if --synchronous specified with --no-sync'); +$primary->command_fails( + [ + 'pg_receivewal', '-D', $stream_dir, '--compress_program', 'lz4', + '--compress', '0' + ], + 'failure if --compress_program=lz4 specified with --compress'); + # Slot creation and drop my $slot_name = 'test'; @@ -66,6 +73,35 @@ $primary->command_ok( ], 'streaming some WAL with --synchronous'); +# Check lz4 compression if available +SKIP: +{ + skip "postgres was not build with LZ4 support", 2 + if (!check_pg_config("#define HAVE_LIBLZ4 1")); + + # Generate some WAL. + $primary->psql('postgres', 'SELECT pg_switch_wal();'); + $nextlsn = + $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); + chomp($nextlsn); + $primary->psql('postgres', + 'INSERT INTO test_table VALUES (generate_series(100,200));'); + $primary->psql('postgres', 'SELECT pg_switch_wal();'); + + # Stream up to the given position + $primary->command_ok( + [ + 'pg_receivewal', '-D', $stream_dir, '--verbose', + '--endpos', $nextlsn, '--compress-program=lz4' + ], + 'streaming some WAL with --compress-program=lz4'); + + # Verify that the stored file is compressed and readable + my @lz4_wals = glob "$stream_dir/*.lz4"; + is(scalar(@lz4_wals), 1, 'one lz4 compressed WAL was created'); + system_or_bail('lz4', '-t', $lz4_wals[0]); +} + # Permissions on WAL files should be default SKIP: { diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index a15bbb20e7..18d5bf3e59 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -17,6 +17,10 @@ #include #include #include + +#ifdef HAVE_LIBLZ4 +#include +#endif #ifdef HAVE_LIBZ #include #endif @@ -30,6 +34,9 @@ /* Size of zlib buffer for .tar.gz */ #define ZLIB_OUT_SIZE 4096 +/* Size of lz4 input chunk for .lz4 */ +#define LZ4_IN_SIZE 4096 + /*------------------------------------------------------------------------- * WalDirectoryMethod - write wal to a directory looking like pg_wal *------------------------------------------------------------------------- @@ -40,9 +47,10 @@ */ typedef struct DirectoryMethodData { - char *basedir; - int compression; - bool sync; + char *basedir; + WalCompressionProgram compression_program; + int compression; + bool sync; } DirectoryMethodData; static DirectoryMethodData *dir_data = NULL; @@ -59,6 +67,11 @@ typedef struct DirectoryMethodFile #ifdef HAVE_LIBZ gzFile gzfp; #endif +#ifdef HAVE_LIBLZ4 + LZ4F_compressionContext_t ctx; + size_t outbufCapacity; + void *outbuf; +#endif } DirectoryMethodFile; static const char * @@ -77,10 +90,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #ifdef HAVE_LIBZ gzFile gzfp = NULL; #endif +#ifdef HAVE_LIBLZ4 + LZ4F_compressionContext_t ctx = NULL; + size_t outbufCapacity; + void *outbuf = NULL; +#endif snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", dir_data->basedir, pathname, - dir_data->compression > 0 ? ".gz" : "", + dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" : + dir_data->compression_program == COMPRESSION_LZ4 ? ".lz4": "", temp_suffix ? temp_suffix : ""); /* @@ -94,7 +113,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ return NULL; #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_program == COMPRESSION_ZLIB) { gzfp = gzdopen(fd, "wb"); if (gzfp == NULL) @@ -111,9 +130,48 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } } #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_program == COMPRESSION_LZ4) + { + size_t ctx_out; + size_t header_size; + + ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); + outbufCapacity = LZ4F_compressBound(LZ4_IN_SIZE, NULL /* default preferences */); + if (LZ4F_isError(ctx_out)) + { + close(fd); + return NULL; + } + + outbuf = pg_malloc0(outbufCapacity); + + /* add the header */ + header_size = LZ4F_compressBegin(ctx, outbuf, outbufCapacity, NULL); + if (LZ4F_isError(header_size)) + { + close(fd); + return NULL; + } + + errno = 0; + if (write(fd, outbuf, header_size) != header_size) + { + int save_errno = errno; + + close(fd); + + /* + * If write didn't set errno, assume problem is no disk space. + */ + errno = save_errno ? save_errno : ENOSPC; + return NULL; + } + } +#endif /* Do pre-padding on non-compressed files */ - if (pad_to_size && dir_data->compression == 0) + if (pad_to_size && dir_data->compression_program == COMPRESSION_NONE) { PGAlignedXLogBlock zerobuf; int bytes; @@ -158,7 +216,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ fsync_parent_path(tmppath) != 0) { #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_program == COMPRESSION_ZLIB) gzclose(gzfp); else #endif @@ -169,9 +227,18 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ f = pg_malloc0(sizeof(DirectoryMethodFile)); #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_program == COMPRESSION_ZLIB) f->gzfp = gzfp; #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_program == COMPRESSION_LZ4) + { + f->ctx = ctx; + f->outbuf = outbuf; + f->outbufCapacity = outbufCapacity; + } +#endif + f->fd = fd; f->currpos = 0; f->pathname = pg_strdup(pathname); @@ -191,9 +258,46 @@ dir_write(Walfile f, const void *buf, size_t count) Assert(f != NULL); #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_program == COMPRESSION_ZLIB) r = (ssize_t) gzwrite(df->gzfp, buf, count); else +#endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_program == COMPRESSION_LZ4) + { + size_t chunk; + size_t remaining; + const void *inbuf = buf; + + remaining = count; + while (remaining > 0) + { + size_t compressed; + + if (remaining > LZ4_IN_SIZE) + chunk = LZ4_IN_SIZE; + else + chunk = remaining; + + remaining -= chunk; + compressed = LZ4F_compressUpdate(df->ctx, + df->outbuf, df->outbufCapacity, + inbuf, chunk, + NULL); + + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->outbuf, compressed) != compressed) + return -1; + + inbuf = ((char *)inbuf) + chunk; + } + + /* XXX: This is what our caller expects, but it is not nice at all */ + r = (ssize_t)count; + } + else #endif r = write(df->fd, buf, count); if (r > 0) @@ -221,9 +325,30 @@ dir_close(Walfile f, WalCloseMethod method) Assert(f != NULL); #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_program == COMPRESSION_ZLIB) r = gzclose(df->gzfp); else +#endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_program == COMPRESSION_LZ4) + { + /* Flush any internal buffers */ + size_t compressed = LZ4F_compressEnd(df->ctx, + df->outbuf, df->outbufCapacity, + NULL); + if (LZ4F_isError(compressed)) + { + return -1; + } + + if (write(df->fd, df->outbuf, compressed) != compressed) + { + return -1; + } + + r = close(df->fd); + } + else #endif r = close(df->fd); @@ -238,11 +363,13 @@ dir_close(Walfile f, WalCloseMethod method) */ snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", dir_data->basedir, df->pathname, - dir_data->compression > 0 ? ".gz" : "", + dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" : + dir_data->compression_program == COMPRESSION_LZ4 ? ".lz4": "", df->temp_suffix); snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s", dir_data->basedir, df->pathname, - dir_data->compression > 0 ? ".gz" : ""); + dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" : + dir_data->compression_program == COMPRESSION_LZ4 ? ".lz4": ""); r = durable_rename(tmppath, tmppath2); } else if (method == CLOSE_UNLINK) @@ -250,7 +377,8 @@ dir_close(Walfile f, WalCloseMethod method) /* Unlink the file once it's closed */ snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", dir_data->basedir, df->pathname, - dir_data->compression > 0 ? ".gz" : "", + dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" : + dir_data->compression_program == COMPRESSION_LZ4 ? ".lz4": "", df->temp_suffix ? df->temp_suffix : ""); r = unlink(tmppath); } @@ -270,6 +398,12 @@ dir_close(Walfile f, WalCloseMethod method) } } +#ifdef HAVE_LIBLZ4 + pg_free(df->outbuf); + /* supports free on NULL */ + LZ4F_freeCompressionContext(df->ctx); +#endif + pg_free(df->pathname); pg_free(df->fullpath); if (df->temp_suffix) @@ -346,7 +480,9 @@ dir_finish(void) WalWriteMethod * -CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) +CreateWalDirectoryMethod(const char *basedir, + WalCompressionProgram compression_program, + int compression, bool sync) { WalWriteMethod *method; @@ -362,6 +498,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) method->getlasterror = dir_getlasterror; dir_data = pg_malloc0(sizeof(DirectoryMethodData)); + dir_data->compression_program = compression_program; dir_data->compression = compression; dir_data->basedir = pg_strdup(basedir); dir_data->sync = sync; @@ -983,8 +1120,16 @@ tar_finish(void) return true; } +/* + * The argument compression_program is currently ignored. It is in place for + * symmetry with CreateWalDirectoryMethod which uses it for distinguishing + * between the different compression methods. CreateWalTarMethod and its family + * of functions handle only zlib compression. + */ WalWriteMethod * -CreateWalTarMethod(const char *tarbase, int compression, bool sync) +CreateWalTarMethod(const char *tarbase, + WalCompressionProgram compression_program, + int compression, bool sync) { WalWriteMethod *method; const char *suffix = (compression != 0) ? ".tar.gz" : ".tar"; diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index fc4bb52cb7..f7d8582aad 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -19,6 +19,13 @@ typedef enum CLOSE_NO_RENAME } WalCloseMethod; +typedef enum +{ + COMPRESSION_LZ4, + COMPRESSION_ZLIB, + COMPRESSION_NONE +} WalCompressionProgram; + /* * A WalWriteMethod structure represents the different methods used * to write the streaming WAL as it's received. @@ -86,8 +93,11 @@ struct WalWriteMethod * not all those required for pg_receivewal) */ WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, + WalCompressionProgram compression_program, int compression, bool sync); -WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync); +WalWriteMethod *CreateWalTarMethod(const char *tarbase, + WalCompressionProgram compression_program, + int compression, bool sync); /* Cleanup routines for previously-created methods */ void FreeWalDirectoryMethod(void); -- 2.25.1