From bb5ab738ba2e4e12dc0739de3da51fe244f6e483 Mon Sep 17 00:00:00 2001 From: Filip Janus Date: Thu, 31 Jul 2025 14:02:16 +0200 Subject: [PATCH v20251001 01/25] Add transparent compression for temporary files This commit implements transparent compression for temporary files in PostgreSQL, specifically designed for hash join operations that spill to disk. Features: - Support for LZ4 and PGLZ compression algorithms - GUC parameter 'temp_file_compression' to control compression - Transparent compression/decompression in BufFile layer - Shared compression buffer to minimize memory allocation - Hash join integration using BufFileCreateCompressTemp() The compression is applied automatically when temp_file_compression is enabled, with no changes required to calling code. Only hash joins use compression currently, with seeking limited to rewinding to start. Configuration options: - temp_file_compression = 'no' (default) - temp_file_compression = 'pglz' - temp_file_compression = 'lz4' (requires --with-lz4) Fix GUC tables structure for compression support --- src/Makefile.global.in | 1 + src/backend/access/gist/gistbuildbuffers.c | 2 +- src/backend/backup/backup_manifest.c | 2 +- src/backend/executor/nodeHashjoin.c | 2 +- src/backend/storage/file/buffile.c | 317 +++++++++++++++++- src/backend/utils/misc/guc_parameters.dat | 7 + src/backend/utils/misc/guc_tables.c | 13 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/backend/utils/sort/logtape.c | 2 +- src/backend/utils/sort/tuplestore.c | 2 +- src/include/storage/buffile.h | 12 +- 11 files changed, 338 insertions(+), 23 deletions(-) diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 0aa389bc710..3a8b277a9ae 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -201,6 +201,7 @@ with_liburing = @with_liburing@ with_libxml = @with_libxml@ with_libxslt = @with_libxslt@ with_llvm = @with_llvm@ +with_lz4 = @with_lz4@ with_system_tzdata = @with_system_tzdata@ with_uuid = @with_uuid@ with_zlib = @with_zlib@ diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c index 0707254d18e..9cc371f47fe 100644 --- a/src/backend/access/gist/gistbuildbuffers.c +++ b/src/backend/access/gist/gistbuildbuffers.c @@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel) * Create a temporary file to hold buffer pages that are swapped out of * memory. */ - gfbb->pfile = BufFileCreateTemp(false); + gfbb->pfile = BufFileCreateTemp(false, false); gfbb->nFileBlocks = 0; /* Initialize free page management. */ diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c index d05252f383c..35d088db0f3 100644 --- a/src/backend/backup/backup_manifest.c +++ b/src/backend/backup/backup_manifest.c @@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest, manifest->buffile = NULL; else { - manifest->buffile = BufFileCreateTemp(false); + manifest->buffile = BufFileCreateTemp(false, false); manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256); if (pg_cryptohash_init(manifest->manifest_ctx) < 0) elog(ERROR, "failed to initialize checksum of backup manifest: %s", diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5661ad76830..384265ca74a 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, { MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt); - file = BufFileCreateTemp(false); + file = BufFileCreateCompressTemp(false); *fileptr = file; MemoryContextSwitchTo(oldctx); diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 366d70d38a1..3cb3b4fcbb7 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -53,6 +53,17 @@ #include "storage/bufmgr.h" #include "storage/fd.h" #include "utils/resowner.h" +#include "utils/memutils.h" +#include "common/pg_lzcompress.h" + +#ifdef USE_LZ4 +#include +#endif + +/* Compression types */ +#define TEMP_NONE_COMPRESSION 0 +#define TEMP_PGLZ_COMPRESSION 1 +#define TEMP_LZ4_COMPRESSION 2 /* * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE. @@ -62,6 +73,8 @@ #define MAX_PHYSICAL_FILESIZE 0x40000000 #define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ) +int temp_file_compression = TEMP_NONE_COMPRESSION; + /* * This data structure represents a buffered file that consists of one or * more physical files (each accessed through a virtual file descriptor @@ -101,6 +114,10 @@ struct BufFile * wasting per-file alignment padding when some users create many files. */ PGAlignedBlock buffer; + + bool compress_tempfile; /* transparent compression mode */ + bool compress; /* State of usage file compression */ + char *cBuffer; /* compression buffer */ }; static BufFile *makeBufFileCommon(int nfiles); @@ -127,6 +144,9 @@ makeBufFileCommon(int nfiles) file->curOffset = 0; file->pos = 0; file->nbytes = 0; + file->compress_tempfile = false; + file->compress = false; + file->cBuffer = NULL; return file; } @@ -188,9 +208,16 @@ extendBufFile(BufFile *file) * Note: if interXact is true, the caller had better be calling us in a * memory context, and with a resource owner, that will survive across * transaction boundaries. + * + * If compress is true the temporary files will be compressed before + * writing on disk. + * + * Note: The compression does not support random access. Only the hash joins + * use it for now. The seek operation other than seek to the beginning of the + * buffile will corrupt temporary data offsets. */ BufFile * -BufFileCreateTemp(bool interXact) +BufFileCreateTemp(bool interXact, bool compress) { BufFile *file; File pfile; @@ -212,9 +239,68 @@ BufFileCreateTemp(bool interXact) file = makeBufFile(pfile); file->isInterXact = interXact; + if (temp_file_compression != TEMP_NONE_COMPRESSION) + { + file->compress = compress; + } + return file; } +/* + * Wrapper for BufFileCreateTemp + * We want to limit the number of memory allocations for the compression buffer, + * only one buffer for all compression operations is enough + */ +BufFile * +BufFileCreateCompressTemp(bool interXact) +{ + static char *buff = NULL; + static int allocated_for_compression = TEMP_NONE_COMPRESSION; + static int allocated_size = 0; + BufFile *tmpBufFile = BufFileCreateTemp(interXact, true); + + if (temp_file_compression != TEMP_NONE_COMPRESSION) + { + int size = 0; + + switch (temp_file_compression) + { + case TEMP_LZ4_COMPRESSION: +#ifdef USE_LZ4 + size = LZ4_compressBound(BLCKSZ) + sizeof(int); +#endif + break; + case TEMP_PGLZ_COMPRESSION: + size = pglz_maximum_compressed_size(BLCKSZ, BLCKSZ) + 2 * sizeof(int); + break; + } + + /* + * Allocate or reallocate buffer if needed: + * - Buffer is NULL (first time) + * - Compression type changed + * - Current buffer is too small + */ + if (buff == NULL || + allocated_for_compression != temp_file_compression || + allocated_size < size) + { + if (buff != NULL) + pfree(buff); + + /* + * Persistent buffer for all temporary file compressions + */ + buff = MemoryContextAlloc(TopMemoryContext, size); + allocated_for_compression = temp_file_compression; + allocated_size = size; + } + } + tmpBufFile->cBuffer = buff; + return tmpBufFile; +} + /* * Build the name for a given segment of a given BufFile. */ @@ -454,21 +540,133 @@ BufFileLoadBuffer(BufFile *file) else INSTR_TIME_SET_ZERO(io_start); + if (!file->compress) + { + + /* + * Read whatever we can get, up to a full bufferload. + */ + file->nbytes = FileRead(thisfile, + file->buffer.data, + sizeof(file->buffer), + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + if (file->nbytes < 0) + { + file->nbytes = 0; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(thisfile)))); + } /* - * Read whatever we can get, up to a full bufferload. + * Read and decompress data from the temporary file + * The first reading loads size of the compressed block + * Second reading loads compressed data */ - file->nbytes = FileRead(thisfile, - file->buffer.data, - sizeof(file->buffer.data), + } else { + int nread; + int nbytes; + + nread = FileRead(thisfile, + &nbytes, + sizeof(nbytes), file->curOffset, WAIT_EVENT_BUFFILE_READ); - if (file->nbytes < 0) - { - file->nbytes = 0; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - FilePathName(thisfile)))); + + /* Check if first read succeeded */ + if (nread != sizeof(nbytes) && nread > 0) + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("first read is broken"))); + } + + /* if not EOF let's continue */ + if (nread > 0) + { + /* A long life buffer limits number of memory allocations */ + char * buff = file->cBuffer; + int original_size = 0; + int header_advance = sizeof(nbytes); + + Assert(file->cBuffer != NULL); + + /* For PGLZ, read additional original size */ + if (temp_file_compression == TEMP_PGLZ_COMPRESSION) { + int nread_orig = FileRead(thisfile, + &original_size, + sizeof(original_size), + file->curOffset + sizeof(nbytes), + WAIT_EVENT_BUFFILE_READ); + + /* Check if second read succeeded */ + if (nread_orig != sizeof(original_size) && nread_orig > 0) { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("second read is corrupt: expected %d bytes, got %d bytes", + (int)sizeof(original_size), nread_orig))); + } + + if (nread_orig <= 0) { + file->nbytes = 0; + return; + } + + /* Check if data is uncompressed (marker = -1) */ + if (original_size == -1) { + + int nread_data = 0; + /* Uncompressed data: read directly into buffer */ + file->curOffset += 2 * sizeof(int); /* Skip both header fields */ + nread_data = FileRead(thisfile, + file->buffer.data, + nbytes, /* nbytes contains original size */ + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + file->nbytes = nread_data; + file->curOffset += nread_data; + return; + } + + header_advance = 2 * sizeof(int); + } + + /* + * Read compressed data, curOffset differs with pos + * It reads less data than it returns to caller + * So the curOffset must be advanced here based on compressed size + */ + file->curOffset += header_advance; + + nread = FileRead(thisfile, + buff, + nbytes, + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + + switch (temp_file_compression) + { + case TEMP_LZ4_COMPRESSION: +#ifdef USE_LZ4 + file->nbytes = LZ4_decompress_safe(buff, + file->buffer.data,nbytes,sizeof(file->buffer)); +#endif + break; + + case TEMP_PGLZ_COMPRESSION: + file->nbytes = pglz_decompress(buff,nbytes, + file->buffer.data,original_size,false); + break; + } + file->curOffset += nread; + + if (file->nbytes < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed lz4 data is corrupt"))); + } + } if (track_io_timing) @@ -494,8 +692,79 @@ static void BufFileDumpBuffer(BufFile *file) { int wpos = 0; - int bytestowrite; + int bytestowrite = 0; File thisfile; + char *DataToWrite = file->buffer.data; + int nbytesOriginal = file->nbytes; + + /* + * Compression logic: compress the buffer data if compression is enabled + */ + if (file->compress) + { + char *cData; + int cSize = 0; + + Assert(file->cBuffer != NULL); + cData = file->cBuffer; + + switch (temp_file_compression) + { + case TEMP_LZ4_COMPRESSION: + { +#ifdef USE_LZ4 + int cBufferSize = LZ4_compressBound(file->nbytes); + + /* + * Using stream compression would lead to the slight + * improvement in compression ratio + */ + cSize = LZ4_compress_default(file->buffer.data, + cData + sizeof(int), file->nbytes, cBufferSize); +#endif + break; + } + case TEMP_PGLZ_COMPRESSION: + cSize = pglz_compress(file->buffer.data, file->nbytes, + cData + 2 * sizeof(int), PGLZ_strategy_always); + break; + } + + /* Check if compression was successful */ + if (cSize <= 0) { + if (temp_file_compression == TEMP_PGLZ_COMPRESSION) { + + int marker; + /* PGLZ compression failed, store uncompressed data with -1 marker */ + memcpy(cData, &nbytesOriginal, sizeof(int)); /* First field: original size */ + marker = -1; /* Second field: -1 = uncompressed marker */ + memcpy(cData + sizeof(int), &marker, sizeof(int)); + memcpy(cData + 2 * sizeof(int), file->buffer.data, nbytesOriginal); + file->nbytes = nbytesOriginal + 2 * sizeof(int); + DataToWrite = cData; + } else { + /* LZ4 compression failed, report error */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("LZ4 compression failed: compressed size %d, original size %d", + cSize, nbytesOriginal))); + } + } else { + /* + * Write header in front of compressed data + * LZ4 format: [compressed_size:int][compressed_data] + * PGLZ format: [compressed_size:int][original_size:int][compressed_data] + */ + memcpy(cData, &cSize, sizeof(int)); + if (temp_file_compression == TEMP_PGLZ_COMPRESSION) { + memcpy(cData + sizeof(int), &nbytesOriginal, sizeof(int)); + file->nbytes = cSize + 2 * sizeof(int); + } else { + file->nbytes = cSize + sizeof(int); + } + DataToWrite = cData; + } + } /* * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it @@ -535,7 +804,7 @@ BufFileDumpBuffer(BufFile *file) INSTR_TIME_SET_ZERO(io_start); bytestowrite = FileWrite(thisfile, - file->buffer.data + wpos, + DataToWrite + wpos, bytestowrite, file->curOffset, WAIT_EVENT_BUFFILE_WRITE); @@ -564,7 +833,15 @@ BufFileDumpBuffer(BufFile *file) * logical file position, ie, original value + pos, in case that is less * (as could happen due to a small backwards seek in a dirty buffer!) */ - file->curOffset -= (file->nbytes - file->pos); + if (!file->compress) + file->curOffset -= (file->nbytes - file->pos); + else if (nbytesOriginal - file->pos != 0) + /* + * curOffset must be corrected also if compression is enabled, nbytes + * was changed by compression but we have to use the original value of + * nbytes + */ + file->curOffset -= bytestowrite; if (file->curOffset < 0) /* handle possible segment crossing */ { file->curFile--; @@ -602,8 +879,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK) { if (file->pos >= file->nbytes) { - /* Try to load more data into buffer. */ - file->curOffset += file->pos; + /* Try to load more data into buffer. + * + * curOffset is moved within BufFileLoadBuffer + * because stored data size differs from loaded/ + * decompressed size + */ + if (!file->compress) + file->curOffset += file->pos; file->pos = 0; file->nbytes = 0; BufFileLoadBuffer(file); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 6bc6be13d2a..399cf903fff 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -3214,6 +3214,13 @@ options => 'default_toast_compression_options', }, +{ name => 'temp_file_compression', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT', + short_desc => 'Sets the default compression method for temporary files.', + variable => 'temp_file_compression', + boot_val => 'TEMP_NONE_COMPRESSION', + options => 'temp_file_compression_options', +}, + { name => 'default_transaction_isolation', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT', short_desc => 'Sets the transaction isolation level of each new transaction.', variable => 'DefaultXactIsoLevel', diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 00c8376cf4d..2fb3891b730 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -78,6 +78,7 @@ #include "replication/syncrep.h" #include "storage/aio.h" #include "storage/bufmgr.h" +#include "storage/buffile.h" #include "storage/bufpage.h" #include "storage/copydir.h" #include "storage/io_worker.h" @@ -464,6 +465,18 @@ static const struct config_enum_entry default_toast_compression_options[] = { {NULL, 0, false} }; +/* + * pglz and zstd support should be added as future enhancement + */ +static const struct config_enum_entry temp_file_compression_options[] = { + {"no", TEMP_NONE_COMPRESSION, false}, + {"pglz", TEMP_PGLZ_COMPRESSION, false}, +#ifdef USE_LZ4 + {"lz4", TEMP_LZ4_COMPRESSION, false}, +#endif + {NULL, 0, false} +}; + static const struct config_enum_entry wal_compression_options[] = { {"pglz", WAL_COMPRESSION_PGLZ, false}, #ifdef USE_LZ4 diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c36fcb9ab61..f380983d2f2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -182,6 +182,7 @@ #max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated # for NOTIFY / LISTEN queue +#temp_file_compression = 'no' # enables temporary files compression # - Kernel Resources - diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index e529ceb8260..d862e22ef18 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker) lts->pfile = BufFileCreateFileSet(&fileset->fs, filename); } else - lts->pfile = BufFileCreateTemp(false); + lts->pfile = BufFileCreateTemp(false, false); return lts; } diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c index c9aecab8d66..ef85924cd21 100644 --- a/src/backend/utils/sort/tuplestore.c +++ b/src/backend/utils/sort/tuplestore.c @@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) */ oldcxt = MemoryContextSwitchTo(state->context->parent); - state->myfile = BufFileCreateTemp(state->interXact); + state->myfile = BufFileCreateTemp(state->interXact, false); MemoryContextSwitchTo(oldcxt); diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index a2f4821f240..57908dd5462 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -32,11 +32,21 @@ typedef struct BufFile BufFile; +typedef enum +{ + TEMP_NONE_COMPRESSION, + TEMP_PGLZ_COMPRESSION, + TEMP_LZ4_COMPRESSION +} TempCompression; + +extern PGDLLIMPORT int temp_file_compression; + /* * prototypes for functions in buffile.c */ -extern BufFile *BufFileCreateTemp(bool interXact); +extern BufFile *BufFileCreateTemp(bool interXact, bool compress); +extern BufFile *BufFileCreateCompressTemp(bool interXact); extern void BufFileClose(BufFile *file); pg_nodiscard extern size_t BufFileRead(BufFile *file, void *ptr, size_t size); extern void BufFileReadExact(BufFile *file, void *ptr, size_t size); -- 2.51.0