From 8b6cc6797b4e4ada0e76b1c516d72271e8865497 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 29 Jul 2020 09:24:37 -0400 Subject: [PATCH v2 11/11] POC: Embarrassingly bad server-side compression patch. --- src/backend/Makefile | 2 +- src/backend/replication/Makefile | 1 + src/backend/replication/basebackup.c | 2 + src/backend/replication/basebackup_gzip.c | 166 ++++++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 8 +- src/include/replication/basebackup_sink.h | 1 + 6 files changed, 177 insertions(+), 3 deletions(-) create mode 100644 src/backend/replication/basebackup_gzip.c diff --git a/src/backend/Makefile b/src/backend/Makefile index 9706a95848..8ff63bc77e 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -48,7 +48,7 @@ OBJS = \ LIBS := $(filter-out -lpgport -lpgcommon, $(LIBS)) $(LDAP_LIBS_BE) $(ICU_LIBS) # The backend doesn't need everything that's in LIBS, however -LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) +LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) ifeq ($(with_systemd),yes) LIBS += -lsystemd diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 6b3c77f2c0..20399c6349 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -18,6 +18,7 @@ OBJS = \ backup_manifest.o \ basebackup.o \ basebackup_archiver.o \ + basebackup_gzip.o \ basebackup_libpq.o \ basebackup_progress.o \ basebackup_sink.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 30e242f99e..0c473b02e7 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -248,6 +248,8 @@ perform_base_backup(basebackup_options *opt) if (opt->maxrate > 0) sink = bbsink_throttle_new(sink, opt->maxrate); + sink = bbsink_gzip_new(sink, 1); + /* Set up progress reporting. */ sink = progress_sink = bbsink_progress_new(sink, opt->progress); diff --git a/src/backend/replication/basebackup_gzip.c b/src/backend/replication/basebackup_gzip.c new file mode 100644 index 0000000000..05bd497f38 --- /dev/null +++ b/src/backend/replication/basebackup_gzip.c @@ -0,0 +1,166 @@ +/*------------------------------------------------------------------------- + * + * basebackup_gzip.c + * Basebackup sink implementing gzip compression. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_gzip.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "replication/basebackup_sink.h" + +typedef struct bbsink_gzip +{ + /* Common information for all types of sink. */ + bbsink base; + + /* Compression level. */ + int compresslevel; + + /* Compressed data stream. */ + z_stream zstream; + + /* Compression buffer. */ + const char *buffer; +} bbsink_gzip; + +#define COMPRESS_BUFFER_SIZE 65536 + +static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_gzip_archive_contents(bbsink *sink, + const char *data, size_t len); +static void bbsink_gzip_end_archive(bbsink *sink); +static void *gzip_palloc(void *opaque, unsigned items, unsigned size); +static void gzip_pfree(void *opaque, void *address); + +const bbsink_ops bbsink_gzip_ops = { + .begin_backup = bbsink_forward_begin_backup, + .begin_archive = bbsink_gzip_begin_archive, + .archive_contents = bbsink_gzip_archive_contents, + .end_archive = bbsink_gzip_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_forward_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup +}; + +/* + * Create a new basebackup sink that performs throttling and forwards data + * to a successor sink. + */ +bbsink * +bbsink_gzip_new(bbsink *next, int compresslevel) +{ + bbsink_gzip *sink; + + Assert(next != NULL); + + sink = palloc0(sizeof(bbsink_gzip)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops; + sink->base.bbs_next = next; + sink->compresslevel = compresslevel; + + return &sink->base; +} + +static void +bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_gzip *mysink = (bbsink_gzip *) sink; + z_stream *zs = &mysink->zstream; + + memset(zs, 0, sizeof(z_stream)); + zs->zalloc = gzip_palloc; + zs->zfree = gzip_pfree; + mysink->buffer = palloc(COMPRESS_BUFFER_SIZE); + zs->next_out = (uint8 *) mysink->buffer; + zs->avail_out = COMPRESS_BUFFER_SIZE; + + deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 31, 8, + Z_DEFAULT_STRATEGY); + + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, archive_name); +} + +static void +bbsink_gzip_archive_contents(bbsink *sink, const char *data, size_t len) +{ + bbsink_gzip *mysink = (bbsink_gzip *) sink; + z_stream *zs = &mysink->zstream; + + zs->next_in = (uint8 *) data; + zs->avail_in = len; + + while (zs->avail_in > 0) + { + int res; + unsigned b4; + + b4 = zs->avail_out; + + res = deflate(zs, Z_NO_FLUSH); + if (res != Z_OK) + elog(ERROR, "well that sucks"); + + if (zs->avail_out <= COMPRESS_BUFFER_SIZE / 2) + { + bbsink_archive_contents(sink->bbs_next, mysink->buffer, + COMPRESS_BUFFER_SIZE - zs->avail_out); + zs->next_out = (uint8 *) mysink->buffer; + zs->avail_out = COMPRESS_BUFFER_SIZE; + } + } +} + +static void +bbsink_gzip_end_archive(bbsink *sink) +{ + bbsink_gzip *mysink = (bbsink_gzip *) sink; + z_stream *zs = &mysink->zstream; + int res; + + Assert(zs->avail_in == 0); + + elog(LOG, "bbsink_gzip_end_archive: reached"); + + do + { + res = deflate(zs, Z_FINISH); + if (res != Z_STREAM_END && res != Z_OK) + elog(ERROR, "this would also suck"); + elog(LOG, "bbsink_gzip_end_archive: res = %d", (int) res); + + if (zs->avail_out <= COMPRESS_BUFFER_SIZE) + { + elog(LOG, "finish-putting %u bytes", + COMPRESS_BUFFER_SIZE - zs->avail_out); + bbsink_archive_contents(sink->bbs_next, mysink->buffer, + COMPRESS_BUFFER_SIZE - zs->avail_out); + zs->next_out = (uint8 *) mysink->buffer; + zs->avail_out = COMPRESS_BUFFER_SIZE; + } + } while (res != Z_STREAM_END); + + Assert(sink->bbs_next != NULL); + bbsink_end_archive(sink->bbs_next); +} + +static void * +gzip_palloc(void *opaque, unsigned items, unsigned size) +{ + return palloc(items * size); +} + +static void +gzip_pfree(void *opaque, void *address) +{ + pfree(address); +} diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8f64a0bdf9..bd16bf65f1 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -978,6 +978,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) bool basetablespace; bbstreamer *streamer; char filename[MAXPGPATH]; + bool need_tar_parser = false; memset(&state, 0, sizeof(state)); state.tablespacenum = rownum; @@ -997,6 +998,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) streamer = bbstreamer_extractor_new(current_path, get_tablespace_mapping, progress_update_filename); + need_tar_parser = true; } else { @@ -1022,7 +1024,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #endif streamer = bbstreamer_plain_writer_new(filename, archive_file); - streamer = bbstreamer_tar_archiver_new(streamer); + //streamer = bbstreamer_tar_archiver_new(streamer); progress_filename = filename; } @@ -1038,9 +1040,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) bbstreamer_recovery_injector_new(state.streamer, is_recovery_guc_supported, recoveryconfcontents); + need_tar_parser = true; } - state.streamer = bbstreamer_tar_parser_new(state.streamer); + if (need_tar_parser) + state.streamer = bbstreamer_tar_parser_new(state.streamer); ReceiveCopyData(conn, ReceiveTarCopyChunk, &state); diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index bf2d71fafa..24be3eb3fa 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -177,6 +177,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, extern bbsink *bbsink_libpq_new(void); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); +extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); /* Extra interface functions for progress reporting. */ extern void basebackup_progress_wait_checkpoint(void); -- 2.24.3 (Apple Git-128)