From 90ad8e998d5587c8c605e2f3f015800570ff1498 Mon Sep 17 00:00:00 2001 From: Nitin Motiani Date: Sat, 15 Feb 2025 08:05:25 +0000 Subject: [PATCH v6 2/5] Add pipe-command support in pg_restore * This is same as the pg_dump change. We add support for --pipe-command in directory archive format. This can be used to read from multiple streams and do pre-processing (decompression with a custom algorithm, filtering etc) before restore. Currently that is not possible because the pg_dump output of directory format can't just be piped. * Like pg_dump, here also either filename or --pipe-command can be set. If neither are set, the standard input is used as before. * This is only supported with compression none and archive format directory. * We reuse the inputFileSpec field for the pipe-command. And add a bool to specify if it is a pipe. * The changes made for pg_dump to handle the pipe case with popen and pclose also work here. * The logic of %f format specifier to read from the pg_dump output is the same too. Most of the code from the pg_dump commit works. We add similar logic to the function to read large objects. * The --pipe command works -l and -L option. --- src/bin/pg_dump/compress_io.c | 30 +++++++++++++++---------- src/bin/pg_dump/pg_backup_directory.c | 16 +++++++++++++- src/bin/pg_dump/pg_restore.c | 32 ++++++++++++++++++++++++--- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 5d09d7bf458..b28fcd73617 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -261,22 +261,28 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode, fname = pg_strdup(path); - if (hasSuffix(fname, ".gz")) - compression_spec.algorithm = PG_COMPRESSION_GZIP; - else if (hasSuffix(fname, ".lz4")) - compression_spec.algorithm = PG_COMPRESSION_LZ4; - else if (hasSuffix(fname, ".zst")) - compression_spec.algorithm = PG_COMPRESSION_ZSTD; - else + /* + * If the path is a pipe command, the compression algorithm is none. + */ + if (!path_is_pipe_command) { - if (stat(path, &st) == 0) - compression_spec.algorithm = PG_COMPRESSION_NONE; - else if (check_compressed_file(path, &fname, "gz")) + if (hasSuffix(fname, ".gz")) compression_spec.algorithm = PG_COMPRESSION_GZIP; - else if (check_compressed_file(path, &fname, "lz4")) + else if (hasSuffix(fname, ".lz4")) compression_spec.algorithm = PG_COMPRESSION_LZ4; - else if (check_compressed_file(path, &fname, "zst")) + else if (hasSuffix(fname, ".zst")) compression_spec.algorithm = PG_COMPRESSION_ZSTD; + else + { + if (stat(path, &st) == 0) + compression_spec.algorithm = PG_COMPRESSION_NONE; + else if (check_compressed_file(path, &fname, "gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (check_compressed_file(path, &fname, "lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (check_compressed_file(path, &fname, "zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; + } } CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index db23031d9ac..bc7c4c1f92f 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -439,7 +439,21 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te) tocfname, line); StartRestoreLO(AH, oid, AH->public.ropt->dropSchema); - snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname); + + /* + * XXX : Create a helper function for blob files naming common to + * _LoadLOs an _StartLO. + */ + if (AH->fSpecIsPipe) + { + pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname); + strcpy(path, pipe); + pfree(pipe); + } + else + { + snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname); + } _PrintFileData(AH, path); EndRestoreLO(AH, oid); } diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 9495a37ffc1..46758f72d98 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -66,6 +66,7 @@ main(int argc, char **argv) char *inputFileSpec; bool data_only = false; bool schema_only = false; + bool filespec_is_pipe = false; static int disable_triggers = 0; static int enable_row_security = 0; static int if_exists = 0; @@ -142,6 +143,7 @@ main(int argc, char **argv) {"statistics-only", no_argument, &statistics_only, 1}, {"filter", required_argument, NULL, 4}, {"restrict-key", required_argument, NULL, 6}, + {"pipe-command", required_argument, NULL, 7}, {NULL, 0, NULL, 0} }; @@ -321,6 +323,11 @@ main(int argc, char **argv) opts->restrict_key = pg_strdup(optarg); break; + case 7: /* pipe-command */ + inputFileSpec = pg_strdup(optarg); + filespec_is_pipe = true; + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -328,11 +335,29 @@ main(int argc, char **argv) } } - /* Get file name from command line */ + /* + * Get file name from command line. Note that filename argument and + * pipe-command can't both be set. + */ if (optind < argc) + { + if (filespec_is_pipe) + { + pg_log_error_hint("Only one of [filespec, --pipe-command] allowed"); + exit_nicely(1); + } inputFileSpec = argv[optind++]; - else + } + + /* + * Even if the file argument is not provided, if the pipe-command is + * specified, we need to use that as the file arg and not fallback to + * stdio. + */ + else if (!filespec_is_pipe) + { inputFileSpec = NULL; + } /* Complain if any arguments remain */ if (optind < argc) @@ -485,7 +510,8 @@ main(int argc, char **argv) opts->formatName); } - AH = OpenArchive(inputFileSpec, opts->format, false); + + AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe); SetArchiveOptions(AH, NULL, opts); -- 2.52.0.457.g6b5491de43-goog