From d7daa87489b1cc2583d643db3396872f902a2d15 Mon Sep 17 00:00:00 2001 From: Julien Tachoires Date: Tue, 25 Jun 2024 05:34:43 -0700 Subject: [PATCH 4/7] Compress ReorderBuffer spill files using PGLZ --- doc/src/sgml/config.sgml | 2 +- .../logical/reorderbuffer_compression.c | 58 +++++++++++++++++++ src/backend/utils/misc/guc_tables.c | 5 ++ .../replication/reorderbuffer_compression.h | 2 + 4 files changed, 66 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 955f4f4a8b..2697ebc435 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2017,7 +2017,7 @@ include_dir 'conf.d' set to on or parallel, then the transaction are not fully decoded on the publisher, then, this parameter has not effect if there is no data to spill on disk. - The supported methods are lz4 (if + The supported methods are pglz, lz4 (if PostgreSQL was compiled with ) and off>. The default value is off. diff --git a/src/backend/replication/logical/reorderbuffer_compression.c b/src/backend/replication/logical/reorderbuffer_compression.c index 77f5c76929..a05393cc61 100644 --- a/src/backend/replication/logical/reorderbuffer_compression.c +++ b/src/backend/replication/logical/reorderbuffer_compression.c @@ -13,6 +13,8 @@ */ #include "postgres.h" +#include "common/pg_lzcompress.h" + #ifdef USE_LZ4 #include #endif @@ -313,6 +315,7 @@ ReorderBufferNewCompressorState(MemoryContext context, int compression_method) return lz4_NewCompressorState(context); break; case REORDER_BUFFER_NO_COMPRESSION: + case REORDER_BUFFER_PGLZ_COMPRESSION: default: return NULL; break; @@ -333,6 +336,7 @@ ReorderBufferFreeCompressorState(MemoryContext context, int compression_method, return lz4_FreeCompressorState(context, compressor_state); break; case REORDER_BUFFER_NO_COMPRESSION: + case REORDER_BUFFER_PGLZ_COMPRESSION: default: break; } @@ -421,6 +425,40 @@ ReorderBufferCompress(ReorderBuffer *rb, ReorderBufferDiskHeader **header, pfree(dst); + break; + } + /* PGLZ compression */ + case REORDER_BUFFER_PGLZ_COMPRESSION: + { + int32 dst_size = 0; + char *dst = NULL; + char *src = (char *) rb->outbuf + sizeof(ReorderBufferDiskHeader); + int32 src_size = data_size - sizeof(ReorderBufferDiskHeader); + int32 max_size = PGLZ_MAX_OUTPUT(src_size); + + dst = (char *) palloc0(max_size); + dst_size = pglz_compress(src, src_size, dst, PGLZ_strategy_always); + + if (dst_size < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("PGLZ compression failed"))); + + ReorderBufferReserve(rb, (Size) (dst_size + sizeof(ReorderBufferDiskHeader))); + + hdr = (ReorderBufferDiskHeader *) rb->outbuf; + hdr->comp_strat = REORDER_BUFFER_STRAT_PGLZ; + hdr->size = (Size) dst_size + sizeof(ReorderBufferDiskHeader); + hdr->raw_size = (Size) src_size; + + *header = hdr; + + /* Copy back compressed data into the ReorderBuffer */ + memcpy((char *) rb->outbuf + sizeof(ReorderBufferDiskHeader), dst, + dst_size); + + pfree(dst); + break; } } @@ -495,6 +533,26 @@ ReorderBufferDecompress(ReorderBuffer *rb, char *data, */ break; } + /* PGLZ decompression */ + case REORDER_BUFFER_STRAT_PGLZ: + { + char *buf; + int32 src_size = (int32) header->size - sizeof(ReorderBufferDiskHeader); + int32 buf_size = (int32) header->raw_size; + int32 decBytes; + + /* Decompress data directly into the ReorderBuffer */ + buf = (char *) rb->outbuf; + buf += sizeof(ReorderBufferDiskHeader); + + decBytes = pglz_decompress(data, src_size, buf, buf_size, false); + + if (decBytes < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed PGLZ data is corrupted"))); + break; + } default: /* Other compression methods not yet supported */ break; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 27ce376fd4..0209a3a517 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -489,10 +489,15 @@ static const struct config_enum_entry logical_decoding_spill_compression_options #ifdef USE_LZ4 {"lz4", REORDER_BUFFER_LZ4_COMPRESSION, false}, #endif + {"pglz", REORDER_BUFFER_PGLZ_COMPRESSION, false}, {"off", REORDER_BUFFER_NO_COMPRESSION, false}, + {"on", REORDER_BUFFER_PGLZ_COMPRESSION, false}, {"false", REORDER_BUFFER_NO_COMPRESSION, true}, + {"true", REORDER_BUFFER_PGLZ_COMPRESSION, true}, {"no", REORDER_BUFFER_NO_COMPRESSION, true}, + {"yes", REORDER_BUFFER_PGLZ_COMPRESSION, true}, {"0", REORDER_BUFFER_NO_COMPRESSION, true}, + {"1", REORDER_BUFFER_PGLZ_COMPRESSION, true}, {NULL, 0, false} }; diff --git a/src/include/replication/reorderbuffer_compression.h b/src/include/replication/reorderbuffer_compression.h index d59e9543a8..ea77ed1358 100644 --- a/src/include/replication/reorderbuffer_compression.h +++ b/src/include/replication/reorderbuffer_compression.h @@ -27,6 +27,7 @@ typedef enum ReorderBufferCompressionMethod { REORDER_BUFFER_NO_COMPRESSION, REORDER_BUFFER_LZ4_COMPRESSION, + REORDER_BUFFER_PGLZ_COMPRESSION, } ReorderBufferCompressionMethod; /* @@ -37,6 +38,7 @@ typedef enum ReorderBufferCompressionStrategy REORDER_BUFFER_STRAT_UNCOMPRESSED, REORDER_BUFFER_STRAT_LZ4_STREAMING, REORDER_BUFFER_STRAT_LZ4_REGULAR, + REORDER_BUFFER_STRAT_PGLZ, } ReorderBufferCompressionStrategy; /* Disk serialization support datastructures */ -- 2.43.0