From 16b0864f6daba9e23a8ed53d001099d0e51c0194 Mon Sep 17 00:00:00 2001 From: Khanna Date: Tue, 22 Apr 2025 09:36:29 +0530 Subject: [PATCH v1] pg_basebackup via pg_createsubscriber This patch adds support for initializing a standby node using 'pg_basebackup' through the 'pg_createsubscriber' tool. A new '--create-standby' option is introduced, which automates the process of performing a base backup from the publisher, and configuring the standby node for streaming replication. To support this functionality, the patch introduces helper functions for invoking 'pg_basebackup', and configuring the standby server. It also adds support for managing external binary paths for 'pg_basebackup' and 'psql', making the tool more flexible in varied environments. By integrating 'pg_basebackup', this patch enhances the usability of 'pg_createsubscriber' by enabling a streamlined, end-to-end setup of subscriber nodes. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 13 ++ src/bin/pg_basebackup/pg_createsubscriber.c | 126 +++++++++++++++++- .../t/040_pg_createsubscriber.pl | 54 ++++++++ 3 files changed, 188 insertions(+), 5 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index 4b1d08d5f16..44142f23674 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -108,6 +108,19 @@ PostgreSQL documentation + + + + + + Initializes the subscriber node as a physical standby using + . This automates the process of taking + a base backup from the publisher server, configuring the standby server, + and preparing it for logical replication. + + + + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index f65acc7cb11..0c0a5f1df9a 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -27,6 +27,9 @@ #include "fe_utils/simple_list.h" #include "fe_utils/string_utils.h" #include "getopt_long.h" +#include +#include +#include "port.h" #define DEFAULT_SUB_PORT "50432" #define OBJECTTYPE_PUBLICATIONS 0x0001 @@ -47,6 +50,7 @@ struct CreateSubscriberOptions int recovery_timeout; /* stop recovery after this time */ bool all_dbs; /* all option */ SimpleStringList objecttypes_to_remove; /* list of object types to remove */ + bool create_standby; /* enable standby setup via pg_basebackup */ }; /* per-database publication/subscription info */ @@ -127,6 +131,8 @@ static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname); static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified); +static void create_standby(const struct CreateSubscriberOptions *opt, const char *standby_dir); +static void configure_standby(const struct CreateSubscriberOptions *opt, const char *standby_dir); #define USEC_PER_SEC 1000000 #define WAIT_INTERVAL 1 /* 1 second */ @@ -148,6 +154,8 @@ static pg_prng_state prng_state; static char *pg_ctl_path = NULL; static char *pg_resetwal_path = NULL; +static char *pg_basebackup_path = NULL; +static char *psql_path = NULL; /* standby / subscriber data directory */ static char *subscriber_dir = NULL; @@ -248,6 +256,7 @@ usage(void) printf(_("\nOptions:\n")); printf(_(" -a, --all create subscriptions for all databases except template\n" " databases or databases that don't allow connections\n")); + printf(_(" -c --create-standby prepare the standby using pg_basebackup\n")); printf(_(" -d, --database=DBNAME database in which to create a subscription\n")); printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n")); printf(_(" -n, --dry-run dry run, just show what would be done\n")); @@ -1405,6 +1414,96 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo) return lsn; } +/* + * Function to create a standby server using pg_basebackup. + */ +static void +create_standby(const struct CreateSubscriberOptions *opt, + const char *standby_dir) +{ + PQExpBuffer pg_basebackup_cmd = createPQExpBuffer(); + char auto_conf_path[MAXPGPATH]; + char port_setting[64]; + + appendPQExpBuffer(pg_basebackup_cmd, "\"%s\" -d", pg_basebackup_path); + appendShellString(pg_basebackup_cmd, opt->pub_conninfo_str); + appendPQExpBufferStr(pg_basebackup_cmd, " -D "); + appendShellString(pg_basebackup_cmd, standby_dir); + appendPQExpBufferStr(pg_basebackup_cmd, " -P -X stream -R"); + + pg_log_info("creating the standby server: %s", pg_basebackup_cmd->data); + + if (!dry_run && system(pg_basebackup_cmd->data) != 0) + pg_fatal("pg_basebackup command failed"); + + snprintf(auto_conf_path, sizeof(auto_conf_path), "%s/postgresql.auto.conf", standby_dir); + snprintf(port_setting, sizeof(port_setting), "port = %s\n", opt->sub_port); + + if (!dry_run) + { + FILE *f = fopen(auto_conf_path, "a"); + + if (!f) + pg_fatal("could not open %s for appending: %m", auto_conf_path); + if (fprintf(f, "%s", port_setting) < 0) + { + fclose(f); + pg_fatal("could not write port setting to %s: %m", auto_conf_path); + } + fclose(f); + pg_log_info("set standby port in %s: %s", auto_conf_path, port_setting); + } + else + { + pg_log_info("dry run: would append to %s: %s", auto_conf_path, port_setting); + } + + destroyPQExpBuffer(pg_basebackup_cmd); +} + +/* + * Function to configure the standby server for logical replication. + */ +static void +configure_standby(const struct CreateSubscriberOptions *opt, + const char *standby_dir) +{ + static const char *alter_cmds[] = + { + "ALTER SYSTEM SET wal_level = 'logical';", + "ALTER SYSTEM SET listen_addresses = '*';", + }; + + char command[MAXPGPATH * 2]; + + /* Start the standby server */ + snprintf(command, sizeof(command), "\"%s\" -D \"%s\" -l \"%s/standby.log\" -o \"-p %s\" start", + pg_ctl_path, standby_dir, standby_dir, opt->sub_port); + pg_log_info("starting the standby server: %s", command); + + if (!dry_run && system(command) != 0) + pg_fatal("failed to start standby server"); + + for (int i = 0; i < lengthof(alter_cmds); i++) + { + snprintf(command, sizeof(command), + "\"%s\" -d postgres -p %s -c \"%s\"", psql_path, + opt->sub_port, alter_cmds[i]); + pg_log_info("configuring the standby server: %s", command); + + if (!dry_run && system(command) != 0) + pg_fatal("failed to run: %s", alter_cmds[i]); + } + + /* Stop the standby server */ + snprintf(command, sizeof(command), "\"%s\" -D \"%s\" -l \"%s/standby.log\" -o \"-p %s\" stop", + pg_ctl_path, standby_dir, standby_dir, opt->sub_port); + pg_log_info("stopping the standby server: %s", command); + + if (!dry_run && system(command) != 0) + pg_fatal("failed to stop standby server after ALTER SYSTEM"); +} + static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name) @@ -2021,6 +2120,7 @@ main(int argc, char **argv) static struct option long_options[] = { {"all", no_argument, NULL, 'a'}, + {"create-standby", no_argument, NULL, 'c'}, {"database", required_argument, NULL, 'd'}, {"pgdata", required_argument, NULL, 'D'}, {"dry-run", no_argument, NULL, 'n'}, @@ -2092,6 +2192,7 @@ main(int argc, char **argv) }; opt.recovery_timeout = 0; opt.all_dbs = false; + opt.create_standby = false; /* * Don't allow it to be run as root. It uses pg_ctl which does not allow @@ -2109,7 +2210,7 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v", + while ((c = getopt_long(argc, argv, "acd:D:np:P:R:s:t:TU:v", long_options, &option_index)) != -1) { switch (c) @@ -2117,6 +2218,9 @@ main(int argc, char **argv) case 'a': opt.all_dbs = true; break; + case 'c': + opt.create_standby = true; + break; case 'd': if (!simple_string_list_member(&opt.database_names, optarg)) { @@ -2264,6 +2368,22 @@ main(int argc, char **argv) pg_log_error_hint("Try \"%s --help\" for more information.", progname); exit(1); } + + /* + * Get the absolute path of pg_ctl, pg_resetwal, pg_basebackup and psql on + * the subscriber + */ + pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); + pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); + pg_basebackup_path = get_exec_path(argv[0], "pg_basebackup"); + psql_path = get_exec_path(argv[0], "psql"); + + if (opt.create_standby) + { + create_standby(&opt, subscriber_dir); + configure_standby(&opt, subscriber_dir); + } + pg_log_info("validating publisher connection string"); pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str, &dbname_conninfo); @@ -2346,10 +2466,6 @@ main(int argc, char **argv) } } - /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */ - pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); - pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); - /* Rudimentary check for a data directory */ check_data_directory(subscriber_dir); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 2d532fee567..eb924d4d705 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -8,6 +8,8 @@ use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use File::Path qw(rmtree); +use File::Copy; program_help_ok('pg_createsubscriber'); program_version_ok('pg_createsubscriber'); @@ -537,6 +539,58 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); ok($sysid_p != $sysid_s, 'system identifier was changed'); +$node_s->stop; + +my $tempdir = PostgreSQL::Test::Utils::tempdir; +my $conf_file = $node_s->data_dir . '/' . 'postgresql.conf'; + +copy("$conf_file", "$tempdir/postgresql.conf") or die "Copy failed: $!"; + +# Remove an old directory. +rmtree($node_s->data_dir); + +# Run pg_createsubscriber on node S with '--create-standby' option +command_ok( + [ + 'pg_createsubscriber', + '--verbose', '--verbose', + '--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default, + '--pgdata' => $node_s->data_dir, + '--publisher-server' => $node_p->connstr($db1), + '--publication' => 'pub3', + '--publication' => 'pub4', + '--database' => $db1, + '--database' => $db2, + '--create-standby', + ], + 'run pg_createsubscriber on node S with --create-standby'); + +# Insert a row on P +$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')"); + +my $port = $node_s->port; + +$node_s->append_conf( + 'postgresql.conf', qq[ +hot_standby_feedback = on +]); +$node_s->append_conf( + 'postgresql.auto.conf', qq[ +port = $port +]); + +copy("$tempdir/postgresql.conf", "$conf_file") or die "Copy failed: $!"; + +$node_s->start; + +# Check result in database $db1 +$result = $node_s->safe_psql($db1, 'SELECT * FROM tbl1'); +is( $result, qq(first row +second row +third row +fourth row), + "logical replication works in database $db1"); + # clean up $node_p->teardown_node; $node_s->teardown_node; -- 2.41.0.windows.3