From e82755587a4e039d628235a5e1dc8b1849d0499e Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 25 Jan 2021 19:53:43 +0530 Subject: [PATCH v17] postgres_fdw function to discard cached connections This patch introduces two new functions postgres_fdw_disconnect() and postgres_fdw_disconnect_all() to discard cached connection for a given foreign server or discard all cached connections respectively. --- contrib/postgres_fdw/connection.c | 131 +++++++++++- .../postgres_fdw/expected/postgres_fdw.out | 194 +++++++++++++++++- .../postgres_fdw/postgres_fdw--1.0--1.1.sql | 10 + contrib/postgres_fdw/sql/postgres_fdw.sql | 91 +++++++- doc/src/sgml/postgres-fdw.sgml | 64 +++++- 5 files changed, 479 insertions(+), 11 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index a1404cb6bb..966a8860e8 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -80,6 +80,8 @@ static bool xact_got_connection = false; * SQL functions */ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); +PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); +PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); /* prototypes of private functions */ static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); @@ -102,6 +104,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); static bool UserMappingPasswordRequired(UserMapping *user); +static bool disconnect_cached_connections(Oid serverid); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -1447,7 +1450,7 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS) /* * If the server has been dropped in the current explicit * transaction, then this entry would have been invalidated in - * pgfdw_inval_callback at the end of drop sever command. Note + * pgfdw_inval_callback at the end of drop server command. Note * that this connection would not have been closed in * pgfdw_inval_callback because it is still being used in the * current explicit transaction. So, assert that here. @@ -1470,3 +1473,129 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +/* + * Disconnect cached foreign server connection. + * + * This function throws an error when there is no foreign server with the given + * name. + * + * It closes cached connections of the given foreign server that are not being + * used within current transaction yet and returns true if it manages to close + * at least one such connection. If any of the found connection is being used + * within the current transaction, then it doesn't close it, instead emits a + * warning. If all the found connections are in use or no connection is found + * for the given foreign server or cache doesn't exit at all, then it returns + * false. + */ +Datum +postgres_fdw_disconnect(PG_FUNCTION_ARGS) +{ + ForeignServer *server; + char *servername; + + servername = text_to_cstring(PG_GETARG_TEXT_PP(0)); + server = GetForeignServerByName(servername, false); + + PG_RETURN_BOOL(disconnect_cached_connections(server->serverid)); +} + +/* + * Disconnect all cached connections. + * + * This function scans all the cache entries, closes connections that are not + * being used within current transaction. It emits warning for each connection + * that's in use. + * + * It returns true, if it closes at least one connection, otherwise false. + */ +Datum +postgres_fdw_disconnect_all(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid)); +} + +/* + * Workhorse to disconnect cached connections. + * + * This function disconnects either all unused connections when called from + * postgres_fdw_disconnect_all or a given foreign server unused connection when + * called from postgres_fdw_disconnect. + * + * It returns true if at least one connection is disconnected, otherwise false. + */ +static bool +disconnect_cached_connections(Oid serverid) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + bool all = !OidIsValid(serverid); + bool result = false; + + /* + * We allow non-super users to disconnect the cached connections made by + * super users. That's okay because all the cached connections are within + * the same session and if at all required, the connections can be remade. + * As of now we don't see any security risk doing this. If required, + * non-supers can be disallowed to disconnect the connections. + */ + + /* + * Connection cache is not yet initialized in this session, so return + * false. + */ + if (!ConnectionHash) + return result; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* Ignore cache entry if no open connection right now. */ + if (!entry->conn) + continue; + /* + * Either disconnect given or all the active and not in use cached + * connections. + */ + if (all || entry->serverid == serverid) + { + /* We cannot close connection that's in use, so issue a warning. */ + if (entry->xact_depth > 0) + { + ForeignServer *server; + + server = GetForeignServerExtended(entry->serverid, + FSV_MISSING_OK); + + if (!server) + { + /* + * If the server has been dropped in the current explicit + * transaction, then this entry would have been invalidated + * in pgfdw_inval_callback at the end of drop server + * command. Note that this connection would not have been + * closed in pgfdw_inval_callback because it is still being + * used in the current explicit transaction. So, assert + * that here. + */ + Assert(entry->invalidated); + + ereport(WARNING, + (errmsg("cannot close dropped server connection because it is still in use"))); + } + else + ereport(WARNING, + (errmsg("cannot close connection for server \"%s\" because it is still in use", + server->servername))); + } + else + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + result = true; + } + } + } + + return result; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index b4a04d2c14..d38d9cd4c8 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -17,7 +17,10 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; - + EXECUTE $$CREATE SERVER loopback4 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; CREATE USER MAPPING FOR public SERVER testserver1 @@ -25,6 +28,7 @@ CREATE USER MAPPING FOR public SERVER testserver1 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; CREATE USER MAPPING FOR public SERVER loopback3; +CREATE USER MAPPING FOR public SERVER loopback4; -- =================================================================== -- create objects used through FDW loopback server -- =================================================================== @@ -140,6 +144,11 @@ CREATE FOREIGN TABLE ft7 ( c2 int NOT NULL, c3 text ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft8 ( + c1 int NOT NULL, + c2 int NOT NULL, + c3 text +) SERVER loopback4 OPTIONS (schema_name 'S 1', table_name 'T 4'); -- =================================================================== -- tests for validator -- =================================================================== @@ -211,7 +220,8 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') | -(6 rows) + public | ft8 | loopback4 | (schema_name 'S 1', table_name 'T 4') | +(7 rows) -- Test that alteration of server options causes reconnection -- Remote's errors might be non-English, so hide them to ensure stable results @@ -9053,9 +9063,9 @@ ERROR: 08006 COMMIT; -- Clean up DROP PROCEDURE terminate_backend_and_wait(text); --- =================================================================== --- test connection invalidation cases --- =================================================================== +-- ============================================================================= +-- test connection invalidation cases and postgres_fdw_get_connections function +-- ============================================================================= -- This test case is for closing the connection in pgfdw_xact_callback BEGIN; -- List all the existing cached connections. Only loopback2 should be output. @@ -9118,6 +9128,180 @@ SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; loopback2 | t (1 row) +-- ======================================================================= +-- test postgres_fdw_disconnect and postgres_fdw_disconnect_all functions +-- ======================================================================= +-- Return true as all cached connections are closed. +SELECT postgres_fdw_disconnect_all(); + postgres_fdw_disconnect_all +----------------------------- + t +(1 row) + +-- Ensure to cache loopback connection. +SELECT 1 FROM ft1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +BEGIN; +-- Ensure to cache loopback2 connection. +SELECT 1 FROM ft6 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- List all the existing cached connections. loopback and loopback2 should be +-- output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + server_name | valid +-------------+------- + loopback | t + loopback2 | t +(2 rows) + +-- Issue a warning and return false as loopback2 connection is still in use and +-- can not be closed. +SELECT postgres_fdw_disconnect('loopback2'); +WARNING: cannot close connection for server "loopback2" because it is still in use + postgres_fdw_disconnect +------------------------- + f +(1 row) + +-- Close loopback connection, return true and issue a warning as loopback2 +-- connection is still in use and can not be closed. +SELECT postgres_fdw_disconnect_all(); +WARNING: cannot close connection for server "loopback2" because it is still in use + postgres_fdw_disconnect_all +----------------------------- + t +(1 row) + +-- List all the existing cached connections. loopback2 should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + server_name | valid +-------------+------- + loopback2 | t +(1 row) + +-- Ensure to cache loopback connection. +SELECT 1 FROM ft1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- Ensure to cache loopback4 connection. +SELECT 1 FROM ft8 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- List all the existing cached connections. loopback, loopback2, loopback4 +-- should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + server_name | valid +-------------+------- + loopback | t + loopback2 | t + loopback4 | t +(3 rows) + +DROP SERVER loopback4 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to user mapping for public on server loopback4 +drop cascades to foreign table ft8 +-- Return false as connections are still in use, warnings are issued. +SELECT postgres_fdw_disconnect_all(); +WARNING: cannot close dropped server connection because it is still in use +WARNING: cannot close connection for server "loopback" because it is still in use +WARNING: cannot close connection for server "loopback2" because it is still in use + postgres_fdw_disconnect_all +----------------------------- + f +(1 row) + +COMMIT; +-- Close loopback2 connection and return true. +SELECT postgres_fdw_disconnect('loopback2'); + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- List all the existing cached connections. loopback should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + server_name | valid +-------------+------- + loopback | t +(1 row) + +-- Return false as loopback2 connectin is closed already. +SELECT postgres_fdw_disconnect('loopback2'); + postgres_fdw_disconnect +------------------------- + f +(1 row) + +-- Return an error as there is no foreign server with given name. +SELECT postgres_fdw_disconnect('unknownserver'); +ERROR: server "unknownserver" does not exist +-- Close loopback connection and return true. +SELECT postgres_fdw_disconnect_all(); + postgres_fdw_disconnect_all +----------------------------- + t +(1 row) + +-- List all the existing cached connections. No connection exists, so NULL +-- should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + server_name | valid +-------------+------- +(0 rows) + +-- ============================================================================= +-- test case for having multiple cached connections for a foreign server +-- ============================================================================= +CREATE ROLE multi_conn_user1 SUPERUSER; +CREATE ROLE multi_conn_user2 SUPERUSER; +CREATE USER MAPPING FOR multi_conn_user1 SERVER loopback; +CREATE USER MAPPING FOR multi_conn_user2 SERVER loopback; +SET ROLE multi_conn_user1; +-- Will cache loopback connection with user mapping for multi_conn_user1 +SELECT 1 FROM ft1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +RESET ROLE; +SET ROLE multi_conn_user2; +-- Will cache loopback connection with user mapping for multi_conn_user2 +SELECT 1 FROM ft1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +RESET ROLE; +-- Should output two connections for loopback server +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + server_name | valid +-------------+------- + loopback | t + loopback | t +(2 rows) + +-- Clean up +DROP USER MAPPING FOR multi_conn_user1 SERVER loopback; +DROP USER MAPPING FOR multi_conn_user2 SERVER loopback; +DROP ROLE multi_conn_user1; +DROP ROLE multi_conn_user2; -- =================================================================== -- batch insert -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql index 7f85784466..b8c0209036 100644 --- a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql +++ b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql @@ -8,3 +8,13 @@ CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text, RETURNS SETOF record AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED; + +CREATE FUNCTION postgres_fdw_disconnect (text) +RETURNS bool +AS 'MODULE_PATHNAME','postgres_fdw_disconnect' +LANGUAGE C STRICT PARALLEL RESTRICTED; + +CREATE FUNCTION postgres_fdw_disconnect_all () +RETURNS bool +AS 'MODULE_PATHNAME','postgres_fdw_disconnect_all' +LANGUAGE C STRICT PARALLEL RESTRICTED; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 28b82f5f9d..5f9e1f195f 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -19,7 +19,10 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; - + EXECUTE $$CREATE SERVER loopback4 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; @@ -28,6 +31,7 @@ CREATE USER MAPPING FOR public SERVER testserver1 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; CREATE USER MAPPING FOR public SERVER loopback3; +CREATE USER MAPPING FOR public SERVER loopback4; -- =================================================================== -- create objects used through FDW loopback server @@ -154,6 +158,12 @@ CREATE FOREIGN TABLE ft7 ( c3 text ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft8 ( + c1 int NOT NULL, + c2 int NOT NULL, + c3 text +) SERVER loopback4 OPTIONS (schema_name 'S 1', table_name 'T 4'); + -- =================================================================== -- tests for validator -- =================================================================== @@ -2710,9 +2720,9 @@ COMMIT; -- Clean up DROP PROCEDURE terminate_backend_and_wait(text); --- =================================================================== --- test connection invalidation cases --- =================================================================== +-- ============================================================================= +-- test connection invalidation cases and postgres_fdw_get_connections function +-- ============================================================================= -- This test case is for closing the connection in pgfdw_xact_callback BEGIN; -- List all the existing cached connections. Only loopback2 should be output. @@ -2739,6 +2749,79 @@ COMMIT; -- the above transaction. SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; +-- ======================================================================= +-- test postgres_fdw_disconnect and postgres_fdw_disconnect_all functions +-- ======================================================================= +-- Return true as all cached connections are closed. +SELECT postgres_fdw_disconnect_all(); +-- Ensure to cache loopback connection. +SELECT 1 FROM ft1 LIMIT 1; +BEGIN; +-- Ensure to cache loopback2 connection. +SELECT 1 FROM ft6 LIMIT 1; +-- List all the existing cached connections. loopback and loopback2 should be +-- output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; +-- Issue a warning and return false as loopback2 connection is still in use and +-- can not be closed. +SELECT postgres_fdw_disconnect('loopback2'); +-- Close loopback connection, return true and issue a warning as loopback2 +-- connection is still in use and can not be closed. +SELECT postgres_fdw_disconnect_all(); +-- List all the existing cached connections. loopback2 should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; +-- Ensure to cache loopback connection. +SELECT 1 FROM ft1 LIMIT 1; +-- Ensure to cache loopback4 connection. +SELECT 1 FROM ft8 LIMIT 1; +-- List all the existing cached connections. loopback, loopback2, loopback4 +-- should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; +DROP SERVER loopback4 CASCADE; +-- Return false as connections are still in use, warnings are issued. +SELECT postgres_fdw_disconnect_all(); +COMMIT; +-- Close loopback2 connection and return true. +SELECT postgres_fdw_disconnect('loopback2'); +-- List all the existing cached connections. loopback should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; +-- Return false as loopback2 connectin is closed already. +SELECT postgres_fdw_disconnect('loopback2'); +-- Return an error as there is no foreign server with given name. +SELECT postgres_fdw_disconnect('unknownserver'); +-- Close loopback connection and return true. +SELECT postgres_fdw_disconnect_all(); +-- List all the existing cached connections. No connection exists, so NULL +-- should be output. +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + +-- ============================================================================= +-- test case for having multiple cached connections for a foreign server +-- ============================================================================= +CREATE ROLE multi_conn_user1 SUPERUSER; +CREATE ROLE multi_conn_user2 SUPERUSER; +CREATE USER MAPPING FOR multi_conn_user1 SERVER loopback; +CREATE USER MAPPING FOR multi_conn_user2 SERVER loopback; + +SET ROLE multi_conn_user1; +-- Will cache loopback connection with user mapping for multi_conn_user1 +SELECT 1 FROM ft1 LIMIT 1; +RESET ROLE; + +SET ROLE multi_conn_user2; +-- Will cache loopback connection with user mapping for multi_conn_user2 +SELECT 1 FROM ft1 LIMIT 1; +RESET ROLE; + +-- Should output two connections for loopback server +SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + +-- Clean up +DROP USER MAPPING FOR multi_conn_user1 SERVER loopback; +DROP USER MAPPING FOR multi_conn_user2 SERVER loopback; +DROP ROLE multi_conn_user1; +DROP ROLE multi_conn_user2; + -- =================================================================== -- batch insert -- =================================================================== diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index fb4c22ac69..7f614390a4 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -512,7 +512,7 @@ OPTIONS (ADD password_required 'false'); the end of that transaction. true is returned otherwise. If there are no open connections, no record is returned. Example usage of the function: - + postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; server_name | valid -------------+------- @@ -522,6 +522,54 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + + + postgres_fdw_disconnect(server_name text) returns boolean + + + This function discards open connections that are established by + postgres_fdw from the local session to a foreign + server with the given name if they are not used in the current local + transaction yet, and then returns true if it manages + to discard at least one such connection. If any of the found connection + is being used within the current local transaction, then it doesn't + discard it, instead emits a warning for each such connection. If all the + found connections are in use or no connection is found for the given + foreign server or cache doesn't exit at all, then it returns false. + If no foreign server with the given name is found, an error is emitted. + Non-superusers are not allowed to discard any connection. Example usage + of the function: + +postgres=# SELECT postgres_fdw_disconnect('loopback1'); + postgres_fdw_disconnect +------------------------- + t + + + + + + + postgres_fdw_disconnect_all() returns boolean + + + This function discards all the open connections that postgres_fdw + established from the local session to the foreign server if they are not + used in the current local transaction yet. It doesn't discard the + connections that's already used in the current local transaction and + emits a warning for each such connection. It returns true, + if it closes at least one connection, otherwise false. + Non-superusers are not allowed to discard any connection. Example usage + of the function: + +postgres=# SELECT postgres_fdw_disconnect_all(); + postgres_fdw_disconnect_all +----------------------------- + t + + + + @@ -537,6 +585,20 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; multiple user identities (user mappings) are used to access the foreign server, a connection is established for each user mapping. + + + Since the postgres_fdw keeps the connections to remote + servers in the local session, the corresponding sessions that are opened on + the remote servers are kept idle until they are re-used by the local session. + This may waste resources if those connections are not frequently used by the + local session. To address this, the postgres_fdw + provides following way to remove the connections to the remote servers and + so the remote sessions: + + postgres_fdw_disconnect_all() to discard all the + connections or postgres_fdw_disconnect(server_name text) + to discard the connection associated with the given foreign server. + -- 2.25.1