From 34e5ea311322b3bc3bb0f8c925f9ade1a59c6f09 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 3 Jan 2023 11:31:04 -0800 Subject: [PATCH v2] wip: don't block inside PQconnectdb et al --- src/include/libpq/libpq-be-fe-helpers.h | 233 ++++++++++++++++++ .../libpqwalreceiver/libpqwalreceiver.c | 53 +--- contrib/dblink/dblink.c | 80 +----- contrib/postgres_fdw/connection.c | 42 +--- 4 files changed, 256 insertions(+), 152 deletions(-) create mode 100644 src/include/libpq/libpq-be-fe-helpers.h diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h new file mode 100644 index 00000000000..4f3d3b821f0 --- /dev/null +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -0,0 +1,233 @@ +/*------------------------------------------------------------------------- + * + * libpq-be-fe-helpers.h + * Helper functions for using libpq in extensions + * + * Code built directly into the backend is not allowed to link to libpq + * directly. Extension code is allowed to use libpq however. However, libpq + * used in extensions has to be careful to block inside libpq, otherwise + * interrupts will not be processed, leading to issues like unresolvable + * deadlocks. Backend code also needs to take care to acquire/release an + * external fd for the connection, otherwise fd.c's accounting of fd's is + * broken. + * + * This file provides helper functions to make it easier to comply with these + * rules. It is a header only library as it needs to be linked into each + * extension using libpq, and it seems too small to be worth adding a + * dedicated static library for. + * + * TODO: For historical reasons the connections established here are not put + * into non-blocking mode. That can lead to blocking even when only the async + * libpq functions are used. This should be fixed. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/libpq/libpq-be-fe-helpers.h + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQ_BE_FE_HELPERS_H +#define LIBPQ_BE_FE_HELPERS_H + +/* + * Despite the name, BUILDING_DLL is set only when building code directly part + * of the backend. Which also is where libpq isn't allowed to be + * used. Obviously this doesn't protect against libpq-fe.h getting included + * otherwise, but perhaps still protects against a few mistakes... + */ +#ifdef BUILDING_DLL +#error "libpq may not be used code directly built into the backend" +#endif + +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/latch.h" + + +static inline void libpqsrv_connect_prepare(void); +static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); + +/* + * PQfinish() wrapper that additionally releases the reserved file descriptor. + * + * It is allowed to call this with a NULL pgconn iff returned by + * libpqsrv_connect*. + */ +static inline void +libpqsrv_disconnect(PGconn *conn) +{ + /* + * If no connection was established, we haven't reserved an FD for it (or + * already released it). This rule makes it easier to write PG_CATCH() + * handlers for this facility's users. + * + * See also libpqsrv_connect_internal(). + */ + if (conn == NULL) + return; + + ReleaseExternalFD(); + PQfinish(conn); +} + +/* + * PQconnectdb() wrapper that reserves a file descriptor and processes + * interrupts. + */ +static inline PGconn * +libpqsrv_connect(const char *conninfo, uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStart(conninfo); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * PQconnectdbParams() wrapper that reserves a file descriptor and processes + * interrupts. + */ +static inline PGconn * +libpqsrv_connect_params(const char *const *keywords, + const char *const *values, + int expand_dbname, + uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStartParams(keywords, values, expand_dbname); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_prepare(void) +{ + /* + * We must obey fd.c's limit on non-virtual file descriptors. Assume that + * a PGconn represents one long-lived FD. (Doing this here also ensures + * that VFDs are closed if needed to make room.) + */ + if (!AcquireExternalFD()) + { +#ifndef WIN32 /* can't write #if within ereport() macro */ + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); +#else + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process setting."))); +#endif + } + +} + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) +{ + /* + * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do + * that here. + */ + if (conn == NULL) + { + ReleaseExternalFD(); + return; + } + + /* + * Can't wait without a socket. Note that we don't want to close the libpq + * connection yet, so callers can emit a useful error. + */ + if (PQstatus(conn) == CONNECTION_BAD) + return; + + /* + * WaitLatchOrSocket() can conceivably fail, handle this case here instead + * of requiring all callers to do so. + */ + PG_TRY(); + { + PostgresPollingStatusType status; + + /* + * Poll connection until we have OK or FAILED status. + * + * Per spec for PQconnectPoll, first wait till socket is write-ready. + */ + status = PGRES_POLLING_WRITING; + while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED) + { + int io_flag; + int rc; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + + /* + * Windows needs a different test while waiting for + * connection-made + */ + else if (PQstatus(conn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* If socket is ready, advance the libpq state machine */ + if (rc & io_flag) + status = PQconnectPoll(conn); + } + } + PG_CATCH(); + { + /* + * If an error is thrown here, the callers won't call + * libpqsrv_disconnect() with a conn, so release resources + * immediately. + */ + ReleaseExternalFD(); + PQfinish(conn); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +#endif /* LIBPQ_BE_FE_HELPERS_H */ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 219cd73b7fc..bab2fe29562 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -24,6 +24,7 @@ #include "common/connect.h" #include "funcapi.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err) { WalReceiverConn *conn; - PostgresPollingStatusType status; const char *keys[6]; const char *vals[6]; int i = 0; @@ -172,52 +172,9 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, Assert(i < sizeof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectStartParams(keys, vals, - /* expand_dbname = */ true); - if (PQstatus(conn->streamConn) == CONNECTION_BAD) - { - *err = pchomp(PQerrorMessage(conn->streamConn)); - return NULL; - } - - /* - * Poll connection until we have OK or FAILED status. - * - * Per spec for PQconnectPoll, first wait till socket is write-ready. - */ - status = PGRES_POLLING_WRITING; - do - { - int io_flag; - int rc; - - if (status == PGRES_POLLING_READING) - io_flag = WL_SOCKET_READABLE; -#ifdef WIN32 - /* Windows needs a different test while waiting for connection-made */ - else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) - io_flag = WL_SOCKET_CONNECTED; -#endif - else - io_flag = WL_SOCKET_WRITEABLE; - - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) - status = PQconnectPoll(conn->streamConn); - } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + conn->streamConn = libpqsrv_connect_params(keys, vals, + /* expand_dbname = */ true, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); if (PQstatus(conn->streamConn) != CONNECTION_OK) { @@ -740,7 +697,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) static void libpqrcv_disconnect(WalReceiverConn *conn) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); PQfreemem(conn->recvBuf); pfree(conn); } diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 8dd122042b4..130320db571 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -48,6 +48,7 @@ #include "funcapi.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "parser/scansup.h" @@ -59,6 +60,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/varlena.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -199,37 +201,14 @@ dblink_get_conn(char *conname_or_str, connstr = conname_or_str; dblink_connstr_check(connstr); - /* - * We must obey fd.c's limit on non-virtual file descriptors. Assume - * that a PGconn represents one long-lived FD. (Doing this here also - * ensures that VFDs are closed if needed to make room.) - */ - if (!AcquireExternalFD()) - { -#ifndef WIN32 /* can't write #if within ereport() macro */ - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); -#else - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process setting."))); -#endif - } - /* OK to make connection */ - conn = PQconnectdb(connstr); + conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION); if (PQstatus(conn) == CONNECTION_BAD) { char *msg = pchomp(PQerrorMessage(conn)); - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); ereport(ERROR, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), errmsg("could not establish connection"), @@ -312,36 +291,13 @@ dblink_connect(PG_FUNCTION_ARGS) /* check password in connection string if not superuser */ dblink_connstr_check(connstr); - /* - * We must obey fd.c's limit on non-virtual file descriptors. Assume that - * a PGconn represents one long-lived FD. (Doing this here also ensures - * that VFDs are closed if needed to make room.) - */ - if (!AcquireExternalFD()) - { -#ifndef WIN32 /* can't write #if within ereport() macro */ - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); -#else - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process setting."))); -#endif - } - /* OK to make connection */ - conn = PQconnectdb(connstr); + conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION); if (PQstatus(conn) == CONNECTION_BAD) { msg = pchomp(PQerrorMessage(conn)); - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); if (rconn) pfree(rconn); @@ -366,10 +322,7 @@ dblink_connect(PG_FUNCTION_ARGS) else { if (pconn->conn) - { - PQfinish(pconn->conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); pconn->conn = conn; } @@ -402,8 +355,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) if (!conn) dblink_conn_not_avail(conname); - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); if (rconn) { deleteConnection(conname); @@ -838,10 +790,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { /* if needed, close the connection to the database */ if (freeconn) - { - PQfinish(conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); } PG_END_TRY(); @@ -1516,10 +1465,7 @@ dblink_exec(PG_FUNCTION_ARGS) { /* if needed, close the connection to the database */ if (freeconn) - { - PQfinish(conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); } PG_END_TRY(); @@ -2606,8 +2552,7 @@ createNewConnection(const char *name, remoteConn *rconn) if (found) { - PQfinish(rconn->conn); - ReleaseExternalFD(); + libpqsrv_disconnect(rconn->conn); pfree(rconn); ereport(ERROR, @@ -2647,8 +2592,7 @@ dblink_security_check(PGconn *conn, remoteConn *rconn) { if (!PQconnectionUsedPassword(conn)) { - PQfinish(conn); - ReleaseExternalFD(); + libpqsrv_disconnect(conn); if (rconn) pfree(rconn); diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ed75ce3f79c..7760380f00d 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -17,6 +17,7 @@ #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" #include "funcapi.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -446,35 +447,10 @@ connect_pg_server(ForeignServer *server, UserMapping *user) /* verify the set of connection parameters */ check_conn_params(keywords, values, user); - /* - * We must obey fd.c's limit on non-virtual file descriptors. Assume - * that a PGconn represents one long-lived FD. (Doing this here also - * ensures that VFDs are closed if needed to make room.) - */ - if (!AcquireExternalFD()) - { -#ifndef WIN32 /* can't write #if within ereport() macro */ - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", - server->servername), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); -#else - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", - server->servername), - errdetail("There are too many open files on the local server."), - errhint("Raise the server's max_files_per_process setting."))); -#endif - } - /* OK to make connection */ - conn = PQconnectdbParams(keywords, values, false); - - if (!conn) - ReleaseExternalFD(); /* because the PG_CATCH block won't */ + conn = libpqsrv_connect_params(keywords, values, + /* expand_dbname = */ false, + PG_WAIT_EXTENSION); if (!conn || PQstatus(conn) != CONNECTION_OK) ereport(ERROR, @@ -507,12 +483,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) } PG_CATCH(); { - /* Release PGconn data structure if we managed to create one */ - if (conn) - { - PQfinish(conn); - ReleaseExternalFD(); - } + libpqsrv_disconnect(conn); PG_RE_THROW(); } PG_END_TRY(); @@ -528,9 +499,8 @@ disconnect_pg_server(ConnCacheEntry *entry) { if (entry->conn != NULL) { - PQfinish(entry->conn); + libpqsrv_disconnect(entry->conn); entry->conn = NULL; - ReleaseExternalFD(); } } -- 2.38.0