From c77cf14dd89d8cab2a5c00a7ae716a7acb007911 Mon Sep 17 00:00:00 2001 From: Nitin Motiani Date: Tue, 11 Feb 2025 08:31:02 +0000 Subject: [PATCH v6 1/5] Add pipe-command support for directory mode of pg_dump * We add a new flag --pipe-command which can be used in directory mode. This allows us to support multiple streams and we can do post processing like compression, filtering etc. This is currently not possible with directory-archive format. * Currently this flag is only supported with compression none and archive format directory. * This flag can't be used with the flag --file. Only one of the two flags can be used at a time. * We reuse the filename field for the --pipe-command also. And add a bool to specify that the field will be used as a pipe command. * Most of the code remains as it is. The core change is that in case of --pipe-command, instead of fopen we do popen. * The user would need a way to store the post-processing output in files. For that we support the same format as the directory mode currently does with the flag --file. We allow the user to add a format specifier %f to the --pipe-command. And for each stream, the format specifier is replaced with the corresponding file name. This file name is the same as it would have been if the flag --file had been used. * To enable the above, there are a few places in the code where we change the file name creation logic. Currently the file name is appended to the directory name which is provided with --file flag. In case of --pipe-command, we instead replace %f with the file name. This change is made for the common use case and separately for blob files. * There is an open question on what mode to use in case of large objects TOC file. Currently the code uses "ab" but that won't work for popen. We have proposed a few options in the comments regarding this. For the time being we are using mode PG_BINARY_W for the pipe use case. --- src/bin/pg_dump/compress_gzip.c | 9 ++- src/bin/pg_dump/compress_gzip.h | 3 +- src/bin/pg_dump/compress_io.c | 27 +++++-- src/bin/pg_dump/compress_io.h | 11 ++- src/bin/pg_dump/compress_lz4.c | 11 ++- src/bin/pg_dump/compress_lz4.h | 3 +- src/bin/pg_dump/compress_none.c | 26 ++++++- src/bin/pg_dump/compress_none.h | 3 +- src/bin/pg_dump/compress_zstd.c | 10 ++- src/bin/pg_dump/compress_zstd.h | 3 +- src/bin/pg_dump/pg_backup.h | 5 +- src/bin/pg_dump/pg_backup_archiver.c | 22 +++--- src/bin/pg_dump/pg_backup_archiver.h | 2 + src/bin/pg_dump/pg_backup_directory.c | 103 +++++++++++++++++++++----- src/bin/pg_dump/pg_dump.c | 36 ++++++++- src/bin/pg_dump/pg_restore.c | 2 +- 16 files changed, 220 insertions(+), 56 deletions(-) diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c index 41a3d059f98..75bef29562e 100644 --- a/src/bin/pg_dump/compress_gzip.c +++ b/src/bin/pg_dump/compress_gzip.c @@ -417,8 +417,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH) void InitCompressFileHandleGzip(CompressFileHandle *CFH, - const pg_compress_specification compression_spec) + const pg_compress_specification compression_spec, + bool path_is_pipe_command) { + if (path_is_pipe_command) + pg_fatal("cPipe command not supported for Gzip"); + CFH->open_func = Gzip_open; CFH->open_write_func = Gzip_open_write; CFH->read_func = Gzip_read; @@ -443,7 +447,8 @@ InitCompressorGzip(CompressorState *cs, void InitCompressFileHandleGzip(CompressFileHandle *CFH, - const pg_compress_specification compression_spec) + const pg_compress_specification compression_spec, + bool path_is_pipe_command) { pg_fatal("this build does not support compression with %s", "gzip"); } diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h index af1a2a3445e..f77c5c86c56 100644 --- a/src/bin/pg_dump/compress_gzip.h +++ b/src/bin/pg_dump/compress_gzip.h @@ -19,6 +19,7 @@ extern void InitCompressorGzip(CompressorState *cs, const pg_compress_specification compression_spec); extern void InitCompressFileHandleGzip(CompressFileHandle *CFH, - const pg_compress_specification compression_spec); + const pg_compress_specification compression_spec, + bool path_is_pipe_command); #endif /* _COMPRESS_GZIP_H_ */ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index af47ef88839..5d09d7bf458 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -191,20 +191,30 @@ free_keep_errno(void *p) * Initialize a compress file handle for the specified compression algorithm. */ CompressFileHandle * -InitCompressFileHandle(const pg_compress_specification compression_spec) +InitCompressFileHandle(const pg_compress_specification compression_spec, + bool path_is_pipe_command) { CompressFileHandle *CFH; + CFH = pg_malloc0(sizeof(CompressFileHandle)); - if (compression_spec.algorithm == PG_COMPRESSION_NONE) - InitCompressFileHandleNone(CFH, compression_spec); + /* + * Always set to non-compressed when path_is_pipe_command assuming that + * external compressor as part of pipe is more efficient. Can review in + * the future. + */ + if (path_is_pipe_command) + InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command); + + else if (compression_spec.algorithm == PG_COMPRESSION_NONE) + InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command); else if (compression_spec.algorithm == PG_COMPRESSION_GZIP) - InitCompressFileHandleGzip(CFH, compression_spec); + InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command); else if (compression_spec.algorithm == PG_COMPRESSION_LZ4) - InitCompressFileHandleLZ4(CFH, compression_spec); + InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command); else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD) - InitCompressFileHandleZstd(CFH, compression_spec); + InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command); return CFH; } @@ -237,7 +247,8 @@ check_compressed_file(const char *path, char **fname, char *ext) * On failure, return NULL with an error code in errno. */ CompressFileHandle * -InitDiscoverCompressFileHandle(const char *path, const char *mode) +InitDiscoverCompressFileHandle(const char *path, const char *mode, + bool path_is_pipe_command) { CompressFileHandle *CFH = NULL; struct stat st; @@ -268,7 +279,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode) compression_spec.algorithm = PG_COMPRESSION_ZSTD; } - CFH = InitCompressFileHandle(compression_spec); + CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); errno = 0; if (!CFH->open_func(fname, -1, mode, CFH)) { diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index ed7b14f0963..bd0fc2634dc 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -186,6 +186,11 @@ struct CompressFileHandle */ pg_compress_specification compression_spec; + /* + * Compression specification for this file handle. + */ + bool path_is_pipe_command; + /* * Private data to be used by the compressor. */ @@ -195,7 +200,8 @@ struct CompressFileHandle /* * Initialize a compress file handle with the requested compression. */ -extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec); +extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec, + bool path_is_pipe_command); /* * Initialize a compress file stream. Infer the compression algorithm @@ -203,6 +209,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio * suffixes in 'path'. */ extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path, - const char *mode); + const char *mode, + bool path_is_pipe_command); extern bool EndCompressFileHandle(CompressFileHandle *CFH); #endif diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index 20a8741d3ca..dce6cc55910 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -739,10 +739,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH */ void InitCompressFileHandleLZ4(CompressFileHandle *CFH, - const pg_compress_specification compression_spec) + const pg_compress_specification compression_spec, + bool path_is_pipe_command) { LZ4State *state; + if (path_is_pipe_command) + pg_fatal("Pipe command not supported for LZ4"); + CFH->open_func = LZ4Stream_open; CFH->open_write_func = LZ4Stream_open_write; CFH->read_func = LZ4Stream_read; @@ -758,6 +762,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH, if (CFH->compression_spec.level >= 0) state->prefs.compressionLevel = CFH->compression_spec.level; + CFH->path_is_pipe_command = path_is_pipe_command; + CFH->private_data = state; } #else /* USE_LZ4 */ @@ -770,7 +776,8 @@ InitCompressorLZ4(CompressorState *cs, void InitCompressFileHandleLZ4(CompressFileHandle *CFH, - const pg_compress_specification compression_spec) + const pg_compress_specification compression_spec, + bool path_is_pipe_command) { pg_fatal("this build does not support compression with %s", "LZ4"); } diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h index 7360a469fc0..490141ee8a1 100644 --- a/src/bin/pg_dump/compress_lz4.h +++ b/src/bin/pg_dump/compress_lz4.h @@ -19,6 +19,7 @@ extern void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec); extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH, - const pg_compress_specification compression_spec); + const pg_compress_specification compression_spec, + bool path_is_pipe_command); #endif /* _COMPRESS_LZ4_H_ */ diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c index 9997519e351..80203de6dbc 100644 --- a/src/bin/pg_dump/compress_none.c +++ b/src/bin/pg_dump/compress_none.c @@ -211,7 +211,10 @@ close_none(CompressFileHandle *CFH) if (fp) { errno = 0; - ret = fclose(fp); + if (CFH->path_is_pipe_command) + ret = pclose(fp); + else + ret = fclose(fp); if (ret != 0) pg_log_error("could not close file: %m"); } @@ -233,7 +236,12 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH) if (fd >= 0) CFH->private_data = fdopen(dup(fd), mode); else - CFH->private_data = fopen(path, mode); + { + if (CFH->path_is_pipe_command) + CFH->private_data = popen(path, mode); + else + CFH->private_data = fopen(path, mode); + } if (CFH->private_data == NULL) return false; @@ -246,7 +254,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH) { Assert(CFH->private_data == NULL); - CFH->private_data = fopen(path, mode); + pg_log_debug("Opening %s, pipe is %s", + path, CFH->path_is_pipe_command ? "true" : "false"); + + if (CFH->path_is_pipe_command) + CFH->private_data = popen(path, mode); + else + CFH->private_data = fopen(path, mode); + if (CFH->private_data == NULL) return false; @@ -259,7 +274,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH) void InitCompressFileHandleNone(CompressFileHandle *CFH, - const pg_compress_specification compression_spec) + const pg_compress_specification compression_spec, + bool path_is_pipe_command) { CFH->open_func = open_none; CFH->open_write_func = open_write_none; @@ -271,5 +287,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH, CFH->eof_func = eof_none; CFH->get_error_func = get_error_none; + CFH->path_is_pipe_command = path_is_pipe_command; + CFH->private_data = NULL; } diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h index 5134f012ee9..d898a2d411c 100644 --- a/src/bin/pg_dump/compress_none.h +++ b/src/bin/pg_dump/compress_none.h @@ -19,6 +19,7 @@ extern void InitCompressorNone(CompressorState *cs, const pg_compress_specification compression_spec); extern void InitCompressFileHandleNone(CompressFileHandle *CFH, - const pg_compress_specification compression_spec); + const pg_compress_specification compression_spec, + bool path_is_pipe_command); #endif /* _COMPRESS_NONE_H_ */ diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index 889691aa0c2..359ad627f1c 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -27,7 +27,8 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress } void -InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec, + bool path_is_pipe_command) { pg_fatal("this build does not support compression with %s", "ZSTD"); } @@ -558,8 +559,12 @@ Zstd_get_error(CompressFileHandle *CFH) void InitCompressFileHandleZstd(CompressFileHandle *CFH, - const pg_compress_specification compression_spec) + const pg_compress_specification compression_spec, + bool path_is_pipe_command) { + if (path_is_pipe_command) + pg_fatal("Pipe command not supported for Zstd"); + CFH->open_func = Zstd_open; CFH->open_write_func = Zstd_open_write; CFH->read_func = Zstd_read; @@ -571,6 +576,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH, CFH->get_error_func = Zstd_get_error; CFH->compression_spec = compression_spec; + CFH->path_is_pipe_command = path_is_pipe_command; CFH->private_data = NULL; } diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h index 1222d7107d9..1f23e7266bf 100644 --- a/src/bin/pg_dump/compress_zstd.h +++ b/src/bin/pg_dump/compress_zstd.h @@ -20,6 +20,7 @@ extern void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec); extern void InitCompressFileHandleZstd(CompressFileHandle *CFH, - const pg_compress_specification compression_spec); + const pg_compress_specification compression_spec, + bool path_is_pipe_command); #endif /* COMPRESS_ZSTD_H */ diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index d9041dad720..50b0afa4e7c 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -315,14 +315,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX); extern void RestoreArchive(Archive *AHX); /* Open an existing archive */ -extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); +extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe); /* Create a new archive */ extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, - DataDirSyncMethod sync_method); + DataDirSyncMethod sync_method, + bool FileSpecIsPipe); /* The --list option */ extern void PrintTOCSummary(Archive *AHX); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 4a63f7392ae..1c820278770 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -56,7 +56,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, - DataDirSyncMethod sync_method); + DataDirSyncMethod sync_method, bool FileSpecIsPipe); static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te); static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx); static void _doSetFixedOutputState(ArchiveHandle *AH); @@ -232,11 +232,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, - DataDirSyncMethod sync_method) + DataDirSyncMethod sync_method, + bool FileSpecIsPipe) { ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec, - dosync, mode, setupDumpWorker, sync_method); + dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe); return (Archive *) AH; } @@ -244,7 +245,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt, /* Open an existing archive */ /* Public */ Archive * -OpenArchive(const char *FileSpec, const ArchiveFormat fmt) +OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe) { ArchiveHandle *AH; pg_compress_specification compression_spec = {0}; @@ -252,7 +253,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt) compression_spec.algorithm = PG_COMPRESSION_NONE; AH = _allocAH(FileSpec, fmt, compression_spec, true, archModeRead, setupRestoreWorker, - DATA_DIR_SYNC_METHOD_FSYNC); + DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe); return (Archive *) AH; } @@ -1720,7 +1721,7 @@ SetOutput(ArchiveHandle *AH, const char *filename, else mode = PG_BINARY_W; - CFH = InitCompressFileHandle(compression_spec); + CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe); if (!CFH->open_func(filename, fn, mode, CFH)) { @@ -2376,7 +2377,8 @@ static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, - SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method) + SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method, + bool FileSpecIsPipe) { ArchiveHandle *AH; CompressFileHandle *CFH; @@ -2417,6 +2419,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, else AH->fSpec = NULL; + AH->fSpecIsPipe = FileSpecIsPipe; + AH->currUser = NULL; /* unknown */ AH->currSchema = NULL; /* ditto */ AH->currTablespace = NULL; /* ditto */ @@ -2429,14 +2433,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->mode = mode; AH->compression_spec = compression_spec; - AH->dosync = dosync; + AH->dosync = FileSpecIsPipe ? false : dosync; AH->sync_method = sync_method; memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse)); /* Open stdout with no compression for AH output handle */ out_compress_spec.algorithm = PG_COMPRESSION_NONE; - CFH = InitCompressFileHandle(out_compress_spec); + CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe); if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH)) pg_fatal("could not open stdout for appending: %m"); AH->OF = CFH; diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 325b53fc9bd..a8b1ab79e82 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -301,6 +301,8 @@ struct _archiveHandle int loCount; /* # of LOs restored */ char *fSpec; /* Archive File Spec */ + bool fSpecIsPipe; /* fSpec is a pipe command template requiring + * replacing %f with file name */ FILE *FH; /* General purpose file handle */ void *OF; /* Output file */ diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index cd4036ead82..db23031d9ac 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -39,7 +39,8 @@ #include #include -#include "common/file_utils.h" +/* #include "common/file_utils.h" */ +#include "common/percentrepl.h" #include "compress_io.h" #include "dumputils.h" #include "parallel.h" @@ -157,8 +158,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) if (AH->mode == archModeWrite) { - /* we accept an empty existing directory */ - create_or_open_dir(ctx->directory); + if (!AH->fSpecIsPipe) /* no checks for pipe */ + { + /* we accept an empty existing directory */ + create_or_open_dir(ctx->directory); + } } else { /* Read Mode */ @@ -167,7 +171,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) setFilePath(AH, fname, "toc.dat"); - tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R); + tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe); if (tocFH == NULL) pg_fatal("could not open input file \"%s\": %m", fname); @@ -295,7 +299,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = InitCompressFileHandle(AH->compression_spec); + ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe); if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); @@ -353,7 +357,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename) if (!filename) return; - CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R); + CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe); if (!CFH) pg_fatal("could not open input file \"%s\": %m", filename); @@ -416,7 +420,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te) else setFilePath(AH, tocfname, tctx->filename); - CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R); + CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe); if (ctx->LOsTocFH == NULL) pg_fatal("could not open large object TOC file \"%s\" for input: %m", @@ -427,6 +431,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te) { char lofname[MAXPGPATH + 1]; char path[MAXPGPATH]; + char *pipe; /* Can't overflow because line and lofname are the same length */ if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2) @@ -545,7 +550,7 @@ _CloseArchive(ArchiveHandle *AH) /* The TOC is always created uncompressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - tocFH = InitCompressFileHandle(compression_spec); + tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe); if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH)) pg_fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -606,13 +611,46 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te) lclTocEntry *tctx = (lclTocEntry *) te->formatData; pg_compress_specification compression_spec = {0}; char fname[MAXPGPATH]; + const char *mode; setFilePath(AH, fname, tctx->filename); /* The LO TOC file is never compressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - ctx->LOsTocFH = InitCompressFileHandle(compression_spec); - if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH)) + ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe); + + /* + * XXX: We can probably simplify this code by using the mode 'w' for all + * cases. The current implementation is due to historical reason that the + * mode for the LOs TOC file has been "ab" from the start. That is + * something we can't do for pipe-command as popen only supports read and + * write. So here a different mode is used for pipes. + * + * But in future we can evaluate using 'w' for everything.there is one + * ToCEntry There is only one ToCEntry per blob group. And it is written + * by @WriteDataChunksForToCEntry. This function calls _StartLOs once + * before the dumper function and and _EndLOs once after the dumper. And + * the dumper dumps all the LOs in the group. So a blob_NNN.toc is only + * opened once and closed after all the entries are written. Therefore the + * mode can be made 'w' for all the cases. We tested changing the mode to + * PG_BINARY_W and the tests passed. But in case there are some missing + * scenarios, we have not made that change here. Instead for now only + * doing it for the pipe command. + * + * Another alternative is to keep the 'ab' mode for regular files and use + * 'w' mode for pipe files but now also cache the pipe handle to keep it + * open till all the LOs in the dump group are done. This is not needed + * because of the same reason listed above that a file handle is only + * opened once. In short there are 3 solutions : 1. Change the mode for + * everything (preferred) 2. Change it only for pipe-command (current) 3. + * Change it for pipe-command and then cache those handles and close them + * in the end (not needed). + */ + if (AH->fSpecIsPipe) + mode = PG_BINARY_W; + else + mode = "ab"; + if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -626,10 +664,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) { lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; + char *pipe; + char blob_name[MAXPGPATH]; - snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); + if (AH->fSpecIsPipe) + { + snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid); + pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name); + strcpy(fname, pipe); + pfree(pipe); + } + else + { + snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); + } - ctx->dataFH = InitCompressFileHandle(AH->compression_spec); + ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe); if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -683,15 +733,27 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename) { lclContext *ctx = (lclContext *) AH->formatData; char *dname; + char *pipe; dname = ctx->directory; - if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH) - pg_fatal("file name too long: \"%s\"", dname); - strcpy(buf, dname); - strcat(buf, "/"); - strcat(buf, relativeFilename); + if (AH->fSpecIsPipe) + { + pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename); + strcpy(buf, pipe); + pfree(pipe); + } + else /* replace all ocurrences of %f in dname with + * relativeFilename */ + { + if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH) + pg_fatal("file name too long: \"%s\"", dname); + + strcpy(buf, dname); + strcat(buf, "/"); + strcat(buf, relativeFilename); + } } /* @@ -733,17 +795,24 @@ _PrepParallelRestore(ArchiveHandle *AH) * only need an approximate indicator of that. */ setFilePath(AH, fname, tctx->filename); + pg_log_error("filename: %s", fname); if (stat(fname, &st) == 0) te->dataLength = st.st_size; else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE) { + if (AH->fSpecIsPipe) + pg_log_error("pipe and compressed"); if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) strlcat(fname, ".gz", sizeof(fname)); else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4) strlcat(fname, ".lz4", sizeof(fname)); else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD) + { + pg_log_error("filename: %s", fname); strlcat(fname, ".zst", sizeof(fname)); + pg_log_error("filename: %s", fname); + } if (stat(fname, &st) == 0) te->dataLength = st.st_size; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 687dc98e46d..309c8c14264 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -423,6 +423,7 @@ main(int argc, char **argv) { int c; const char *filename = NULL; + bool filename_is_pipe = false; const char *format = "p"; TableInfo *tblinfo; int numTables; @@ -539,6 +540,7 @@ main(int argc, char **argv) {"exclude-extension", required_argument, NULL, 17}, {"sequence-data", no_argument, &dopt.sequence_data, 1}, {"restrict-key", required_argument, NULL, 25}, + {"pipe-command", required_argument, NULL, 26}, {NULL, 0, NULL, 0} }; @@ -610,7 +612,14 @@ main(int argc, char **argv) break; case 'f': + if (filename != NULL) + { + pg_log_error_hint("Only one of [--file, --pipe-command] allowed"); + exit_nicely(1); + } filename = pg_strdup(optarg); + filename_is_pipe = false; /* it already is, setting again + * here just for clarity */ break; case 'F': @@ -801,6 +810,15 @@ main(int argc, char **argv) case 25: dopt.restrict_key = pg_strdup(optarg); + + case 26: /* pipe command */ + if (filename != NULL) + { + pg_log_error_hint("Only one of [--file, --pipe-command] allowed"); + exit_nicely(1); + } + filename = pg_strdup(optarg); + filename_is_pipe = true; break; default: @@ -924,14 +942,26 @@ main(int argc, char **argv) else if (dopt.restrict_key) pg_fatal("option %s can only be used with %s", "--restrict-key", "--format=plain"); + if (filename_is_pipe && archiveFormat != archDirectory) + { + pg_log_error_hint("Option --pipe-command is only supported with directory format."); + exit_nicely(1); + } + + if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0) + { + pg_log_error_hint("Option --pipe-command is not supported with any compression type."); + exit_nicely(1); + } /* * Custom and directory formats are compressed by default with gzip when * available, not the others. If gzip is not available, no compression is - * done by default. + * done by default. If directory format is being used with pipe-command, + * no compression is done. */ if ((archiveFormat == archCustom || archiveFormat == archDirectory) && - !user_compression_defined) + !filename_is_pipe && !user_compression_defined) { #ifdef HAVE_LIBZ compression_algorithm_str = "gzip"; @@ -981,7 +1011,7 @@ main(int argc, char **argv) /* Open the output file */ fout = CreateArchive(filename, archiveFormat, compression_spec, - dosync, archiveMode, setupDumpWorker, sync_method); + dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe); /* Make dump options accessible right away */ SetArchiveOptions(fout, &dopt, NULL); diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 84b8d410c9e..9495a37ffc1 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -485,7 +485,7 @@ main(int argc, char **argv) opts->formatName); } - AH = OpenArchive(inputFileSpec, opts->format); + AH = OpenArchive(inputFileSpec, opts->format, false); SetArchiveOptions(AH, NULL, opts); -- 2.52.0.457.g6b5491de43-goog