From 440934abc57e0117391a1f975b6e28844683af57 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Thu, 21 Sep 2023 19:10:18 +0530 Subject: [PATCH v1] Super user related fixes in logical replication. This patch fixes a couple of super user related issues in logical replication: a) Make sure that the password option is provided for non super users when password required is true. b) Restart the apply worker if it identifies that the super user privilege has changed for the subscription owner. --- src/backend/commands/subscriptioncmds.c | 28 +++++----- .../libpqwalreceiver/libpqwalreceiver.c | 51 +++++++++++++------ src/backend/replication/logical/tablesync.c | 8 +-- src/backend/replication/logical/worker.c | 25 ++++++--- src/backend/replication/walreceiver.c | 2 +- src/include/catalog/pg_subscription.h | 1 + src/include/replication/walreceiver.h | 14 ++--- src/test/subscription/t/027_nosuperuser.pl | 33 +++++++++++- 8 files changed, 112 insertions(+), 50 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6fe111e98d..e6ab5fed39 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -673,7 +673,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, load_file("libpqwalreceiver", false); /* Check the connection info string. */ - walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser()); + walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser(), NULL); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -733,12 +733,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, List *tables; ListCell *lc; char table_state; - bool must_use_password; + bool is_superuser = superuser_arg(owner); /* Try to connect to the publisher. */ - must_use_password = !superuser_arg(owner) && opts.passwordrequired; - wrconn = walrcv_connect(conninfo, true, must_use_password, - stmt->subname, &err); + wrconn = walrcv_connect(conninfo, true, opts.passwordrequired, + is_superuser, stmt->subname, &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -863,15 +862,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } SubRemoveRels; SubRemoveRels *sub_remove_rels; WalReceiverConn *wrconn; - bool must_use_password; + bool is_superuser = superuser_arg(sub->owner); /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); /* Try to connect to the publisher. */ - must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(sub->conninfo, true, sub->passwordrequired, + is_superuser, sub->name, &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1249,7 +1247,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, load_file("libpqwalreceiver", false); /* Check the connection info string. */ walrcv_check_conninfo(stmt->conninfo, - sub->passwordrequired && !superuser_arg(sub->owner)); + sub->passwordrequired && !superuser_arg(sub->owner), NULL); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(stmt->conninfo); @@ -1488,7 +1486,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) WalReceiverConn *wrconn; Form_pg_subscription form; List *rstates; - bool must_use_password; + bool passwordrequired; + bool is_superuser; /* * Lock pg_subscription with AccessExclusiveLock to ensure that the @@ -1519,7 +1518,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) form = (Form_pg_subscription) GETSTRUCT(tup); subid = form->oid; subowner = form->subowner; - must_use_password = !superuser_arg(subowner) && form->subpasswordrequired; + passwordrequired = form->subpasswordrequired; + is_superuser = superuser_arg(subowner); /* must be owner */ if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId())) @@ -1682,8 +1682,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, must_use_password, - subname, &err); + wrconn = walrcv_connect(conninfo, true, passwordrequired, + is_superuser, subname, &err); if (wrconn == NULL) { if (!slotname) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..c0820939b7 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -49,10 +49,12 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, bool must_use_password, - const char *appname, char **err); + bool logical, bool passwordrequired, + bool is_superuser, const char *appname, + char **err); static void libpqrcv_check_conninfo(const char *conninfo, - bool must_use_password); + bool must_use_password, + WalReceiverConn *conn); static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); @@ -121,7 +123,7 @@ _PG_init(void) * Establish the connection to the primary server for XLOG streaming * * If an error occurs, this function will normally return NULL and set *err - * to a palloc'ed error message. However, if must_use_password is true and + * to a palloc'ed error message. However, if passwordrequired is true and * the connection fails to use the password, this function will ereport(ERROR). * We do this because in that case the error includes a detail and a hint for * consistency with other parts of the system, and it's not worth adding the @@ -129,8 +131,8 @@ _PG_init(void) * case. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, - const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool logical, bool passwordrequired, + bool is_superuser, const char *appname, char **err) { WalReceiverConn *conn; const char *keys[6]; @@ -187,16 +189,20 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, if (PQstatus(conn->streamConn) != CONNECTION_OK) goto bad_connection_errmsg; - if (must_use_password && !PQconnectionUsedPassword(conn->streamConn)) + if (passwordrequired && !is_superuser) { - libpqsrv_disconnect(conn->streamConn); - pfree(conn); + libpqrcv_check_conninfo(conninfo, true, conn); + if (!PQconnectionUsedPassword(conn->streamConn)) + { + libpqsrv_disconnect(conn->streamConn); + pfree(conn); - ereport(ERROR, - (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), - errmsg("password is required"), - errdetail("Non-superuser cannot connect if the server does not request a password."), - errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters."))); + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superuser cannot connect if the server does not request a password."), + errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters."))); + } } if (logical) @@ -239,7 +245,8 @@ bad_connection: * connection string specifies a password and false otherwise. */ static void -libpqrcv_check_conninfo(const char *conninfo, bool must_use_password) +libpqrcv_check_conninfo(const char *conninfo, bool must_use_password, + WalReceiverConn *conn) { PQconninfoOption *opts = NULL; PQconninfoOption *opt; @@ -252,6 +259,12 @@ libpqrcv_check_conninfo(const char *conninfo, bool must_use_password) char *errcopy = err ? pstrdup(err) : "out of memory"; PQfreemem(err); + if (conn) + { + libpqsrv_disconnect(conn->streamConn); + pfree(conn); + } + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid connection string syntax: %s", errcopy))); @@ -275,10 +288,18 @@ libpqrcv_check_conninfo(const char *conninfo, bool must_use_password) } if (!uses_password) + { + if (conn) + { + libpqsrv_disconnect(conn->streamConn); + pfree(conn); + } + ereport(ERROR, (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), errmsg("password is required"), errdetail("Non-superusers must provide a password in the connection string."))); + } } PQconninfoFree(opts); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e2cee92cf2..7f69ddb1f6 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1267,8 +1267,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) char originname[NAMEDATALEN]; RepOriginId originid; UserContext ucxt; - bool must_use_password; bool run_as_owner; + bool is_superuser = superuser_arg(MySubscription->owner); /* Check the state of the table synchronization. */ StartTransactionCommand(); @@ -1276,10 +1276,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, &relstate_lsn); - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - /* Note that the superuser_arg call can access the DB */ CommitTransactionCommand(); @@ -1314,7 +1310,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, + MySubscription->passwordrequired, is_superuser, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 597947410f..ddc5c63c7d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3887,6 +3887,7 @@ maybe_reread_subscription(void) MemoryContext oldctx; Subscription *newsub; bool started_tx = false; + bool is_superuser; /* When cache state is valid there is nothing to do here. */ if (MySubscriptionValid) @@ -3903,6 +3904,7 @@ maybe_reread_subscription(void) oldctx = MemoryContextSwitchTo(ApplyContext); newsub = GetSubscription(MyLogicalRepWorker->subid, true); + is_superuser = superuser_arg(MySubscription->owner); /* * Exit if the subscription was removed. This normally should not happen @@ -3952,7 +3954,8 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + is_superuser != MySubscription->isownersuperuser) { if (am_parallel_apply_worker()) ereport(LOG, @@ -4468,7 +4471,7 @@ run_apply_worker() RepOriginId originid; TimeLineID startpointTLI; char *err; - bool must_use_password; + bool is_superuser = superuser_arg(MySubscription->owner); slotname = MySubscription->slotname; @@ -4493,15 +4496,12 @@ run_apply_worker() replorigin_session_origin = originid; origin_startpos = replorigin_session_get_progress(false); - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - /* Note that the superuser_arg call can access the DB */ CommitTransactionCommand(); LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, + MySubscription->passwordrequired, + is_superuser, MySubscription->name, &err); if (LogRepWorkerWalRcvConn == NULL) @@ -4605,6 +4605,13 @@ InitializeLogRepWorker(void) proc_exit(0); } + /* + * Fetch subscription owner is a superuser. This value will be later + * checked to see when there is any change with this role and the worker + * will be restarted if required. + */ + MySubscription->isownersuperuser = superuser_arg(MySubscription->owner); + MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); @@ -4626,6 +4633,10 @@ InitializeLogRepWorker(void) subscription_change_cb, (Datum) 0); + CacheRegisterSyscacheCallback(AUTHOID, + subscription_change_cb, + (Datum) 0); + if (am_tablesync_worker()) ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index feff709435..896dc9bc6f 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,7 @@ WalReceiverMain(void) sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, false, + wrconn = walrcv_connect(conninfo, false, false, true, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index be36c4a820..87f6f644a9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -144,6 +144,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool isownersuperuser; /* Is subscription owner superuser? */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..22ed9dc8ff 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -239,7 +239,8 @@ typedef struct WalRcvExecResult */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical, - bool must_use_password, + bool passwordrequired, + bool is_superuser, const char *appname, char **err); @@ -249,7 +250,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, * Parse and validate the connection string given as of 'conninfo'. */ typedef void (*walrcv_check_conninfo_fn) (const char *conninfo, - bool must_use_password); + bool must_use_password, + WalReceiverConn *conn); /* * walrcv_get_conninfo_fn @@ -407,10 +409,10 @@ typedef struct WalReceiverFunctionsType extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) -#define walrcv_check_conninfo(conninfo, must_use_password) \ - WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password) +#define walrcv_connect(conninfo, logical, passwordrequired, is_superuser, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, logical, passwordrequired, is_superuser, appname, err) +#define walrcv_check_conninfo(conninfo, must_use_password, conn) \ + WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password, conn) #define walrcv_get_conninfo(conn) \ WalReceiverFunctions->walrcv_get_conninfo(conn) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl index d7a7e3ef5b..bb34db8843 100644 --- a/src/test/subscription/t/027_nosuperuser.pl +++ b/src/test/subscription/t/027_nosuperuser.pl @@ -7,7 +7,8 @@ use warnings; use PostgreSQL::Test::Cluster; use Test::More; -my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset); +my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset, + $stdout, $stderr); $offset = 0; sub publish_insert @@ -104,6 +105,7 @@ for my $node ($node_publisher, $node_subscriber) CREATE ROLE regress_admin SUPERUSER LOGIN; CREATE ROLE regress_alice NOSUPERUSER LOGIN; GRANT CREATE ON DATABASE postgres TO regress_alice; + GRANT PG_CREATE_SUBSCRIPTION TO regress_alice; SET SESSION AUTHORIZATION regress_alice; CREATE SCHEMA alice; GRANT USAGE ON SCHEMA alice TO regress_admin; @@ -303,4 +305,33 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice; expect_replication("alice.unpartitioned", 3, 17, 21, "restoring SELECT permission permits replication to continue"); +# Creation of subscription should fail for non superuser alice with +# 'password is required' error. +($result, $stdout, $stderr) = $node_subscriber->psql( + 'postgres', qq( +SET SESSION AUTHORIZATION regress_alice; +CREATE SUBSCRIPTION regression_sub1 CONNECTION '$publisher_connstr' PUBLICATION alice; +)); +ok( $stderr =~ qr/ERROR: ( [A-Z0-9]+:)? password is required/, + "non superuser must specify password to connect"); + +# The apply worker should get restarted after the superuser prvileges are +# revoked for subscription owner alice. +grant_superuser("regress_alice"); +$node_subscriber->safe_psql( + 'postgres', qq( +SET SESSION AUTHORIZATION regress_alice; +CREATE SUBSCRIPTION regression_sub1 CONNECTION '$publisher_connstr' PUBLICATION alice; +)); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +revoke_superuser("regress_alice"); + +# After apply worker is restarted, it should fail with 'password is required' +# error. +$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? password is required/, + $offset); + done_testing(); -- 2.34.1