From 94f3bf64e9e06a9a32ce99dc795c06465ecdeb44 Mon Sep 17 00:00:00 2001 From: Khanna Date: Thu, 26 Jun 2025 11:11:48 +0530 Subject: [PATCH v1] Support tables via pg_createsubscriber This patch adds support for specifying tables to be included in logical replication publications via pg_createsubscriber. Users can now pass multiple '--database' and '--table' options to define which tables should be published and subscribed for each database. Features: 1. Supports per-database table mapping using multiple '--database'/'--table' pairs. 2. Allows optional column lists and row filters. 3. If '--table' is omitted for a database, a 'FOR ALL TABLES' publication is created. 4. Adds TAP tests to validate combinations of database and table arguments. This improves fine-grained control over logical replication setup and aligns pg_createsubscriber CLI design with other tools like vacuumdb and pg_restore. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 12 + src/bin/pg_basebackup/pg_createsubscriber.c | 230 +++++++++++++++++- .../t/040_pg_createsubscriber.pl | 83 +++++++ 3 files changed, 322 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index bb9cc72576c..f22a50b2c43 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -125,6 +125,18 @@ PostgreSQL documentation + + + + + + Adds a table to be included in the publication for the most recently + specified database. Can be repeated multiple times. The syntax + supports optional column lists and WHERE clauses. + + + + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 025b893a41e..e8874e13f99 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -31,6 +31,46 @@ #define DEFAULT_SUB_PORT "50432" #define OBJECTTYPE_PUBLICATIONS 0x0001 +static char * +pg_strcasestr(const char *haystack, const char *needle) +{ + size_t needlelen; + + if (!haystack || !needle) + return NULL; + + needlelen = strlen(needle); + + for (; *haystack; haystack++) + { + if (pg_strncasecmp(haystack, needle, needlelen) == 0) + return (char *) haystack; + } + + return NULL; +} + +typedef struct TableSpec +{ + char *spec; + char *schema_name; + char *table_name; + char *column_list_raw; + char *where_clause_raw; + struct TableSpec *next; +} TableSpec; + +typedef struct TableListPerDB +{ + char *dbname; + TableSpec *tables; + struct TableListPerDB *next; +} TableListPerDB; + +static TableListPerDB * dblist_head = NULL; +static TableListPerDB * dblist_tail = NULL; +static TableListPerDB * dblist_cur = NULL; + /* Command-line options */ struct CreateSubscriberOptions { @@ -61,6 +101,7 @@ struct LogicalRepInfo bool made_replslot; /* replication slot was created */ bool made_publication; /* publication was created */ + TableSpec *tables; /* list of tables to be subscribed */ }; /* @@ -249,6 +290,7 @@ usage(void) printf(_(" -a, --all create subscriptions for all databases except template\n" " databases and databases that don't allow connections\n")); printf(_(" -d, --database=DBNAME database in which to create a subscription\n")); + printf(_(" -f, --table table to subscribe to; can be specified multiple times\n")); printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n")); printf(_(" -n, --dry-run dry run, just show what would be done\n")); printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT); @@ -505,6 +547,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, else dbinfo[i].subname = NULL; /* Other fields will be filled later */ + dbinfo[i].tables = NULL; pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i, dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", @@ -525,6 +568,20 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, i++; } + for (int j = 0; j < num_dbs; j++) + { + const char *dbname = dbinfo[j].dbname; + + for (TableListPerDB * cur = dblist_head; cur != NULL; cur = cur->next) + { + if (strcmp(cur->dbname, dbname) == 0) + { + dbinfo[j].tables = cur->tables; + break; + } + } + } + return dbinfo; } @@ -1645,11 +1702,74 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) pg_log_info("creating publication \"%s\" in database \"%s\"", dbinfo->pubname, dbinfo->dbname); - appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", - ipubname_esc); + if (dbinfo->tables == NULL) + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc); + else + { + bool first = true; + TableSpec *tbl = dbinfo->tables; + + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR TABLE ", ipubname_esc); + while (tbl) + { + char *escaped_schema = NULL; + char *escaped_table = NULL; + + if (!tbl->table_name || strlen(tbl->table_name) == 0) + pg_fatal("table name cannot be null"); + + if (tbl->schema_name) + escaped_schema = PQescapeIdentifier(conn, tbl->schema_name, strlen(tbl->schema_name)); + escaped_table = PQescapeIdentifier(conn, tbl->table_name, strlen(tbl->table_name)); + + appendPQExpBuffer(str, "%s", first ? "" : ", "); + + if (escaped_schema) + appendPQExpBuffer(str, "%s.", escaped_schema); + appendPQExpBuffer(str, "%s", escaped_table); + + if (tbl->column_list_raw && strlen(tbl->column_list_raw) > 0) + appendPQExpBuffer(str, " (%s)", tbl->column_list_raw); + + if (tbl->where_clause_raw && strlen(tbl->where_clause_raw) > 0) + appendPQExpBuffer(str, " WHERE %s", tbl->where_clause_raw); + + first = false; + tbl = tbl->next; + + if (escaped_schema) + PQfreemem(escaped_schema); + + if (escaped_table) + PQfreemem(escaped_table); + } + } pg_log_debug("command is: %s", str->data); + if (dry_run) + { + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not begin transaction: %s", PQerrorMessage(conn)); + disconnect_database(conn, true); + } + PQclear(res); + + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not create publication \"%s\" in database \"%s\": %s", + dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + + res = PQexec(conn, "ROLLBACK"); + PQclear(res); + } + if (!dry_run) { res = PQexec(conn, str->data); @@ -2022,6 +2142,7 @@ main(int argc, char **argv) { {"all", no_argument, NULL, 'a'}, {"database", required_argument, NULL, 'd'}, + {"table", required_argument, NULL, 'f'}, {"pgdata", required_argument, NULL, 'D'}, {"dry-run", no_argument, NULL, 'n'}, {"subscriber-port", required_argument, NULL, 'p'}, @@ -2109,7 +2230,7 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v", + while ((c = getopt_long(argc, argv, "ad:f:D:np:P:s:t:TU:v", long_options, &option_index)) != -1) { switch (c) @@ -2118,6 +2239,7 @@ main(int argc, char **argv) opt.all_dbs = true; break; case 'd': + TableListPerDB * newdb; if (!simple_string_list_member(&opt.database_names, optarg)) { simple_string_list_append(&opt.database_names, optarg); @@ -2125,6 +2247,108 @@ main(int argc, char **argv) } else pg_fatal("database \"%s\" specified more than once for -d/--database", optarg); + + newdb = pg_malloc0(sizeof(TableListPerDB)); + newdb->dbname = pg_strdup(optarg); + newdb->tables = NULL; + newdb->next = NULL; + if (dblist_tail) + dblist_tail->next = newdb; + else + dblist_head = newdb; + + dblist_tail = newdb; + dblist_cur = newdb; + + break; + case 'f': + TableSpec * ts; + char *full_table_spec; + char *temp_ptr; + char *where_start = NULL; + char *col_list_start = NULL; + char *col_list_end = NULL; + char *table_part_end = NULL; + + if (dblist_cur == NULL) + pg_fatal("option --table must follow a --database"); + + if (strchr(optarg, ';') || strstr(optarg, "--") || strstr(optarg, "/*")) + pg_fatal("invalid SQL control characters in --table argument: \"%s\"", optarg); + + ts = pg_malloc0(sizeof(TableSpec)); + full_table_spec = pg_strdup(optarg); + + where_start = pg_strcasestr(full_table_spec, " WHERE "); + if (where_start) + { + *where_start = '\0'; + where_start += strlen(" WHERE "); + while (*where_start == ' ') + where_start++; + ts->where_clause_raw = pg_strdup(where_start); + } + + col_list_start = strchr(full_table_spec, '('); + if (col_list_start) + { + col_list_end = strrchr(full_table_spec, ')'); + if (!col_list_end || col_list_end < col_list_start) + pg_fatal("malformed column list in --table argument: \"%s\"", optarg); + + *col_list_start = '\0'; + *col_list_end = '\0'; + ts->column_list_raw = pg_strdup(col_list_start + 1); + table_part_end = col_list_start; + } + else + table_part_end = full_table_spec + strlen(full_table_spec); + + temp_ptr = strrchr(full_table_spec, '.'); + if (temp_ptr && temp_ptr < table_part_end) + { + *temp_ptr = '\0'; + ts->schema_name = pg_strdup(full_table_spec); + ts->table_name = pg_strdup(temp_ptr + 1); + } + else + { + ts->schema_name = pg_strdup("public"); + ts->table_name = pg_strdup(full_table_spec); + } + + if (ts->table_name) + { + size_t len = strlen(ts->table_name); + + while (len > 0 && isspace((unsigned char) ts->table_name[len - 1])) + ts->table_name[--len] = '\0'; + } + + if (ts->schema_name) + { + size_t len = strlen(ts->schema_name); + + while (len > 0 && isspace((unsigned char) ts->schema_name[len - 1])) + ts->schema_name[--len] = '\0'; + } + + if (!ts->table_name || strlen(ts->table_name) == 0) + pg_fatal("table name cannot be empty in --table argument: \"%s\"", optarg); + + ts->next = NULL; + + if (dblist_cur->tables == NULL) + dblist_cur->tables = ts; + else + { + TableSpec *tail = dblist_cur->tables; + + while (tail->next) + tail = tail->next; + tail->next = ts; + } + pg_free(full_table_spec); break; case 'D': subscriber_dir = pg_strdup(optarg); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 229fef5b3b5..3bd49630868 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -537,9 +537,92 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); ok($sysid_p != $sysid_s, 'system identifier was changed'); +# Drop existing publications on database db1. +$node_p->safe_psql( + $db1, qq( + DROP PUBLICATION test_pub1; + DROP PUBLICATION test_pub2; + DROP PUBLICATION pub1; +)); + +# Drop existing publications on database db2. +$node_p->safe_psql($db2, "DROP PUBLICATION pub2"); + +# Test: Table-level publication creation +$node_p->safe_psql($db1, "CREATE TABLE public.t1 (id int, val text)"); +$node_p->safe_psql($db1, "CREATE TABLE public.t2 (id int, val text)"); +$node_p->safe_psql($db2, + "CREATE TABLE public.t3 (id int, val text, extra int)"); + +# Initialize node_s2 as a fresh standby of node_p for table-level +# publication test. +$node_p->backup('backup_tablepub'); +my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2'); +$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1); +$node_s2->start; +$node_s2->stop; + +# Run pg_createsubscriber with table-level options +command_ok( + [ + 'pg_createsubscriber', + '--verbose', + '--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default, + '--pgdata' => $node_s2->data_dir, + '--publisher-server' => $node_p->connstr($db1), + '--socketdir' => $node_s2->host, + '--subscriber-port' => $node_s2->port, + '--database' => $db1, + '--table' => 'public.t1 (id)', + '--table' => 'public.t2 (val)', + '--database' => $db2, + '--table' => 'public.t3 (id, extra)', + ], + 'pg_createsubscriber runs with table-level publication (existing nodes)'); + +# Get the publication name created by pg_createsubscriber for db1 +my $pubname1 = $node_p->safe_psql( + $db1, qq( + SELECT pubname FROM pg_publication + WHERE pubname LIKE 'pg_createsubscriber_%' + ORDER BY pubname LIMIT 1 +)); + +# Check publication tables for db1 +my $actual1 = $node_p->safe_psql( + $db1, qq( + SELECT pubname || '|public|' || tablename + FROM pg_publication_tables + WHERE pubname = '$pubname1' + ORDER BY tablename +)); +is($actual1, "$pubname1|public|t1\n$pubname1|public|t2", + 'single publication for both tables created successfully on database db1' +); + +# Get the publication name created by pg_createsubscriber for db2 +my $pubname2 = $node_p->safe_psql( + $db2, qq( + SELECT pubname FROM pg_publication + WHERE pubname LIKE 'pg_createsubscriber_%' + ORDER BY pubname LIMIT 1 +)); + +# Check publication tables for db2 +my $actual2 = $node_p->safe_psql( + $db2, qq( + SELECT pubname || '|public|' || tablename + FROM pg_publication_tables + WHERE pubname = '$pubname2' + ORDER BY tablename +)); +is($actual2, "$pubname2|public|t3", + 'single publication for t3 created successfully on database db2'); + # clean up $node_p->teardown_node; $node_s->teardown_node; +$node_s2->teardown_node; $node_t->teardown_node; $node_f->teardown_node; -- 2.41.0.windows.3