From 7edb4f420982be174478666defd0dadab31362ae Mon Sep 17 00:00:00 2001 From: Jeevan Ladhe Date: Wed, 16 Feb 2022 22:22:27 +0530 Subject: [PATCH 3/4] ZSTD: add client-side compression support. ZSTD compression can now be performed on the client using pg_basebackup -Ft --compress client-zstd[:LEVEL]. Example: pg_basebackup -D /tmp/zstd_client -Ft -Xnone --compress=client-zstd --- src/bin/pg_basebackup/Makefile | 1 + src/bin/pg_basebackup/bbstreamer.h | 2 + src/bin/pg_basebackup/bbstreamer_zstd.c | 202 ++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 28 ++- src/bin/pg_verifybackup/t/010_client_untar.pl | 8 + src/tools/msvc/Mkvcbuild.pm | 1 + 6 files changed, 240 insertions(+), 2 deletions(-) create mode 100644 src/bin/pg_basebackup/bbstreamer_zstd.c mode change 100644 => 100755 src/bin/pg_verifybackup/t/010_client_untar.pl diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 1d0db4f9d0..0035ebcef5 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -44,6 +44,7 @@ BBOBJS = \ bbstreamer_gzip.o \ bbstreamer_inject.o \ bbstreamer_lz4.o \ + bbstreamer_zstd.o \ bbstreamer_tar.o all: pg_basebackup pg_receivewal pg_recvlogical diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h index c2de77bacc..bfc624a863 100644 --- a/src/bin/pg_basebackup/bbstreamer.h +++ b/src/bin/pg_basebackup/bbstreamer.h @@ -209,6 +209,8 @@ extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel); extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); +extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next, + int compresslevel); extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c new file mode 100644 index 0000000000..0b20267cf4 --- /dev/null +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -0,0 +1,202 @@ +/*------------------------------------------------------------------------- + * + * bbstreamer_zstd.c + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/bbstreamer_zstd.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef HAVE_LIBZSTD +#include +#endif + +#include "bbstreamer.h" +#include "common/logging.h" + +#ifdef HAVE_LIBZSTD + +typedef struct bbstreamer_zstd_frame +{ + bbstreamer base; + + ZSTD_CCtx *cctx; + ZSTD_outBuffer zstd_outBuf; +} bbstreamer_zstd_frame; + +static void bbstreamer_zstd_compressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer); +static void bbstreamer_zstd_compressor_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_zstd_compressor_ops = { + .content = bbstreamer_zstd_compressor_content, + .finalize = bbstreamer_zstd_compressor_finalize, + .free = bbstreamer_zstd_compressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs zstd compression of tar + * blocks. + */ +bbstreamer * +bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel) +{ +#ifdef HAVE_LIBZSTD + bbstreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(bbstreamer_zstd_frame)); + + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_zstd_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->cctx = ZSTD_createCCtx(); + if (!streamer->cctx) + pg_log_error("could not create zstd compression context"); + + /* Initialize stream compression preferences */ + ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compresslevel); + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_log_error("this build does not support zstd compression"); + exit(1); +#endif +} + +#ifdef HAVE_LIBZSTD +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +bbstreamer_zstd_compressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t required_outBuf_bound = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if ((mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos) <= + required_outBuf_bound) + { + bbstreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * End-of-stream processing. + */ +static void +bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t required_outBuf_bound = ZSTD_compressBound(0); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if ((mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos) <= + required_outBuf_bound) + { + bbstreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + BBSTREAMER_UNKNOWN); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, + &mystreamer->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next streamer. */ + if (mystreamer->zstd_outBuf.pos > 0) + bbstreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + BBSTREAMER_UNKNOWN); + + bbstreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +bbstreamer_zstd_compressor_free(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + + bbstreamer_free(streamer->bbs_next); + ZSTD_freeCCtx(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 7202a5eae7..7ba752c1c9 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1023,6 +1023,16 @@ parse_compress_options(char *src, WalCompressionMethod *methodres, *methodres = COMPRESSION_LZ4; *locationres = COMPRESS_LOCATION_SERVER; } + else if (pg_strcasecmp(firstpart, "zstd") == 0) + { + *methodres = COMPRESSION_ZSTD; + *locationres = COMPRESS_LOCATION_UNSPECIFIED; + } + else if (pg_strcasecmp(firstpart, "client-zstd") == 0) + { + *methodres = COMPRESSION_ZSTD; + *locationres = COMPRESS_LOCATION_CLIENT; + } else if (pg_strcasecmp(firstpart, "server-zstd") == 0) { *methodres = COMPRESSION_ZSTD; @@ -1146,7 +1156,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, bool inject_manifest; bool is_tar, is_tar_gz, - is_tar_lz4; + is_tar_lz4, + is_tar_zstd; bool must_parse_archive; int archive_name_len = strlen(archive_name); @@ -1169,6 +1180,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation, is_tar_lz4 = (archive_name_len > 8 && strcmp(archive_name + archive_name_len - 4, ".lz4") == 0); + /* Is this a ZSTD archive? */ + is_tar_zstd = (archive_name_len > 8 && + strcmp(archive_name + archive_name_len - 4, ".zst") == 0); + /* * We have to parse the archive if (1) we're suppose to extract it, or if * (2) we need to inject backup_manifest or recovery configuration into it. @@ -1178,7 +1193,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, (spclocation == NULL && writerecoveryconf)); /* At present, we only know how to parse tar archives. */ - if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4) + if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4 + && !is_tar_zstd) { pg_log_error("unable to parse archive: %s", archive_name); pg_log_info("only tar archives can be parsed"); @@ -1250,6 +1266,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation, streamer = bbstreamer_lz4_compressor_new(streamer, compresslevel); } + else if (compressmethod == COMPRESSION_ZSTD) + { + strlcat(archive_filename, ".zst", sizeof(archive_filename)); + streamer = bbstreamer_plain_writer_new(archive_filename, + archive_file); + streamer = bbstreamer_zstd_compressor_new(streamer, + compresslevel); + } else { Assert(false); /* not reachable */ diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl old mode 100644 new mode 100755 index 3616529390..c2a6161be6 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -42,6 +42,14 @@ my @test_configuration = ( 'decompress_flags' => [ '-d' ], 'output_file' => 'base.tar', 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'client-zstd:5'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index bab81bd459..901e755d01 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -380,6 +380,7 @@ sub mkvcbuild $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c'); + $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_zstd.c'); $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c'); $pgbasebackup->AddLibrary('ws2_32.lib'); -- 2.25.1