diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 8ec60ded76..74043ff331 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -19,6 +19,7 @@ OBJS = \ basebackup.o \ basebackup_copy.o \ basebackup_gzip.o \ + basebackup_lz4.o \ basebackup_progress.o \ basebackup_server.o \ basebackup_sink.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index ec50fbab12..8ba1095e4b 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -54,7 +54,8 @@ typedef enum typedef enum { BACKUP_COMPRESSION_NONE, - BACKUP_COMPRESSION_GZIP + BACKUP_COMPRESSION_GZIP, + BACKUP_COMPRESSION_LZ4 } basebackup_compression_type; typedef struct @@ -293,6 +294,8 @@ perform_base_backup(basebackup_options *opt) /* Set up server-side compression, if client requested it */ if (opt->compression == BACKUP_COMPRESSION_GZIP) sink = bbsink_gzip_new(sink, opt->compression_level); + if (opt->compression == BACKUP_COMPRESSION_LZ4) + sink = bbsink_lz4_new(sink); /* Set up progress reporting. */ sink = progress_sink = bbsink_progress_new(sink, opt->progress); @@ -926,6 +929,8 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->compression = BACKUP_COMPRESSION_GZIP; opt->compression_level = optval[4] - '0'; } + else if (strcmp(optval, "lz4") == 0) + opt->compression = BACKUP_COMPRESSION_LZ4; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), diff --git a/src/backend/replication/basebackup_lz4.c b/src/backend/replication/basebackup_lz4.c new file mode 100644 index 0000000000..593fda7e47 --- /dev/null +++ b/src/backend/replication/basebackup_lz4.c @@ -0,0 +1,306 @@ +/*------------------------------------------------------------------------- + * + * basebackup_lz4.c + * Basebackup sink implementing lz4 compression. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_lz4.c + * + *------------------------------------------------------------------------- + */ +#include +#include "postgres.h" + +#ifdef HAVE_LIBLZ4 +#include +#endif + +#include "replication/basebackup_sink.h" + +#ifdef HAVE_LIBLZ4 + +/* + * Read the input buffer in CHUNK_SIZE length in each iteration and pass it to + * the lz4 compression. Defined as 8k, since the input buffer is multiple of + * BLCKSZ i.e. multiple of 8k. + */ +#define CHUNK_SIZE 8192 + +typedef struct bbsink_lz4 +{ + /* Common information for all types of sink. */ + bbsink base; + + LZ4F_compressionContext_t ctx; + LZ4F_preferences_t prefs; + + /* Number of bytes staged in output buffer. */ + size_t bytes_written; + + /* Buffer for keeping the compressed bytes. */ + void *compressed_buffer; + /* Length of the compressed buffer. */ + size_t compressed_buffer_length; +} bbsink_lz4; + +static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_lz4_archive_contents(bbsink *sink, size_t len); +static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len); +static void bbsink_lz4_end_archive(bbsink *sink); + +const bbsink_ops bbsink_lz4_ops = { + .begin_backup = bbsink_forward_begin_backup, + .begin_archive = bbsink_lz4_begin_archive, + .archive_contents = bbsink_lz4_archive_contents, + .end_archive = bbsink_lz4_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_lz4_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_forward_end_backup +}; +#endif + +/* Create a new basebackup sink that performs lz4 compression. */ +bbsink * +bbsink_lz4_new(bbsink *next) +{ +#ifndef HAVE_LIBLZ4 + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("lz4 compression is not supported by this build"))); +#else + bbsink_lz4 *sink; + + Assert(next != NULL); + + sink = palloc0(sizeof(bbsink_lz4)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops; + sink->base.bbs_next = next; + + /* + * We need our own buffer, because we're going to pass different data + * to the next sink than what gets passed to us. + * + * We could try making the input buffer bigger than the output buffer, + * because we expect that compression is going to shrink the input data. + * However, as the compression ratio could be quite high (>10x) and to take + * full advantage of this we would need a huge input buffer. Instead + * it seems better to assume the input buffer may be filled multiple times + * before we succeed in filling the output buffer, and keep the input + * buffer relatively small. For now we just make it the same size as the + * output buffer. + */ + sink->base.bbs_buffer_length = next->bbs_buffer_length; + sink->base.bbs_buffer = palloc(sink->base.bbs_buffer_length); + + return &sink->base; +#endif +} + +#ifdef HAVE_LIBLZ4 + +/* + * Prepare to compress the next archive. + */ +static void +bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + char *lz4_archive_name; + LZ4F_errorCode_t ctxError; + LZ4F_preferences_t *prefs = &mysink->prefs; + size_t headerSize; + + /* Initialize compressor object. */ + prefs->frameInfo.blockSizeID = LZ4F_max256KB; + prefs->frameInfo.blockMode = LZ4F_blockLinked; + prefs->frameInfo.contentChecksumFlag = LZ4F_noContentChecksum; + prefs->frameInfo.frameType = LZ4F_frame; + prefs->frameInfo.contentSize = 0; + prefs->frameInfo.dictID = 0; + prefs->frameInfo.blockChecksumFlag = LZ4F_noBlockChecksum; + prefs->compressionLevel = 0; + + /* + * LZ4F_compressUpdate() returns the number of bytes written into output + * buffer. We need to keep track of how many bytes have been cumulatively + * written into the output buffer(bytes_written). But, + * LZ4F_compressUpdate() returns 0 in case the data is buffered and not + * written to output buffer, set autoFlush to 1 to force the writing to the + * output buffer. + */ + prefs->autoFlush = 1; + + prefs->favorDecSpeed = 0; + prefs->reserved[0] = 0; + prefs->reserved[1] = 0; + prefs->reserved[2] = 0; + + ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + elog(ERROR, "could not create lz4 compression context: %s", + LZ4F_getErrorName(ctxError)); + + /* First of all write the frame header to destination buffer. */ + Assert(CHUNK_SIZE >= LZ4F_HEADER_SIZE_MAX); + headerSize = LZ4F_compressBegin(mysink->ctx, + mysink->base.bbs_next->bbs_buffer, + CHUNK_SIZE, + prefs); + + if (LZ4F_isError(headerSize)) + elog(ERROR, "could not write lz4 header: %s", + LZ4F_getErrorName(headerSize)); + + /* + * We need to write the compressed data after the header in the output + * buffer. So, make sure to update the notion of bytes written to output + * buffer. + */ + mysink->bytes_written = mysink->bytes_written + headerSize; + + /* + * We will be reading the input buffer in the CHUNK_SIZE'd chunks. The + * LZ4F_compressUpdate() would need to have sufficient buffer to write the + * compressed buffer of the size even in the worst case. Allocate the + * buffer once and keep it around until we are done with archiving. + */ + mysink->compressed_buffer_length = LZ4F_compressBound(CHUNK_SIZE, + &mysink->prefs); + mysink->compressed_buffer = palloc(mysink->compressed_buffer_length); + + /* Add ".lz4" to the archive name. */ + lz4_archive_name = psprintf("%s.lz4", archive_name); + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, lz4_archive_name); + pfree(lz4_archive_name); +} + +/* + * Compress the input data to the output buffer until we run out of input + * data. Each time the output buffer fills up, invoke the archive_contents() + * method for then next sink. + * + * Note that since we're compressing the input, it may very commonly happen + * that we consume all the input data without filling the output buffer. In + * that case, the compressed representation of the current input data won't + * actually be sent to the next bbsink until a later call to this function, + * or perhaps even not until bbsink_lz4_end_archive() is invoked. + */ +static void +bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + uint8 *next_in = (uint8 *) mysink->base.bbs_buffer; + + while (avail_in > 0) + { + size_t compressedSize; + int nextChunkLen = CHUNK_SIZE; + + /* Last chunk to be read from the input. */ + if (avail_in < CHUNK_SIZE) + nextChunkLen = avail_in; + + /* + * If we do not have enough space left in the output buffer for this + * chunk to be written, first archive the already written contents. + */ + if (nextChunkLen > mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written || + mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length) + { + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); + mysink->bytes_written = 0; + } + + /* + * Read the nextChunkLen size of data from the input buffer and write the + * output data into unused portion of output buffer. + */ + compressedSize = LZ4F_compressUpdate(mysink->ctx, + mysink->compressed_buffer, + mysink->compressed_buffer_length, + next_in, nextChunkLen, + NULL); + + if (LZ4F_isError(compressedSize)) + elog(ERROR, "could not compress data: %s", + LZ4F_getErrorName(compressedSize)); + + /* + * We should have enough space left in the out buffer to write this + * compressed buffer. + */ + Assert(compressedSize <= + mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written); + + memcpy((uint8 *) mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, + mysink->compressed_buffer, compressedSize); + + /* + * Update our notion of how many bytes we've written into output + * buffer. + */ + mysink->bytes_written = mysink->bytes_written + compressedSize; + + /* Advance the input start since we already read some data. */ + next_in = (uint8 *) next_in + nextChunkLen; + avail_in = avail_in - nextChunkLen; + } +} + +/* + * There might be some data inside lz4's internal buffers; we need to get + * that flushed out and also finalize the lz4 frame and then get that forwarded + * to the successor sink as archive content. + * + * Then we can end processing for this archive. + */ +static void +bbsink_lz4_end_archive(bbsink *sink) +{ + bbsink_lz4 *mysink = (bbsink_lz4 *) sink; + size_t compressedSize; + + /* Write output data into unused portion of output buffer. */ + Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length); + + compressedSize = LZ4F_compressEnd(mysink->ctx, + mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, + mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, + NULL); + + if (LZ4F_isError(compressedSize)) + elog(ERROR, "could not end lz4 compression: %s", + LZ4F_getErrorName(compressedSize)); + + /* Update our notion of how many bytes we've written. */ + mysink->bytes_written = mysink->bytes_written + compressedSize; + + /* Send whatever accumulated output bytes we have. */ + bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); + mysink->bytes_written = 0; + + /* Release the resources. */ + LZ4F_freeCompressionContext(mysink->ctx); + pfree(mysink->compressed_buffer); + mysink->compressed_buffer = NULL; + mysink->compressed_buffer_length = 0; + + bbsink_forward_end_archive(sink); +} + +/* + * Manifest contents are not compressed, but we do need to copy them into + * the successor sink's buffer, because we have our own. + */ +static void +bbsink_lz4_manifest_contents(bbsink *sink, size_t len) +{ + memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); + bbsink_manifest_contents(sink->bbs_next, len); +} + +#endif diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index 9236642d93..0f1c9d19c6 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -258,6 +258,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_copytblspc_new(void); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); +extern bbsink *bbsink_lz4_new(bbsink *next); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);