From 71b584be461281be2d412cbb9ca41a022bd92ff4 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 18 Mar 2024 19:37:40 +0100 Subject: [PATCH v37] postgres_fdw: Start using new libpq cancel APIs Commit 61461a300c1c introduced new functions to libpq for cancelling queries. This replaces the usage of the old ones in postgres_fdw. This is done by introducing a new libpqsrv_cancel helper function. This function takes a timeout and can itself be interupted while it is sending a cancel request instead of being blocked. This new function is now also used by dblink. Finally, it also adds some test coverage for the cancel support in postgres_fdw. Author: Jelte Fennema-Nio Discussion: https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com --- contrib/dblink/dblink.c | 21 ++--- contrib/postgres_fdw/connection.c | 45 +++++----- .../postgres_fdw/expected/postgres_fdw.out | 15 ++++ contrib/postgres_fdw/sql/postgres_fdw.sql | 7 ++ src/include/libpq/libpq-be-fe-helpers.h | 84 +++++++++++++++++++ 5 files changed, 135 insertions(+), 37 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index edbc9ab02ac..2c1d92326ed 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -1347,25 +1347,16 @@ Datum dblink_cancel_query(PG_FUNCTION_ARGS) { PGconn *conn; - PGcancelConn *cancelConn; char *msg; + TimestampTz endtime; dblink_init(); conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - cancelConn = PQcancelCreate(conn); - - PG_TRY(); - { - if (!PQcancelBlocking(cancelConn)) - msg = pchomp(PQcancelErrorMessage(cancelConn)); - else - msg = "OK"; - } - PG_FINALLY(); - { - PQcancelFinish(cancelConn); - } - PG_END_TRY(); + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + 30000); + msg = libpqsrv_cancel(conn, endtime); + if (!msg) + msg = "OK"; PG_RETURN_TEXT_P(cstring_to_text(msg)); } diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4931ebf5915..77a57bbdfd2 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); -static bool pgfdw_cancel_query_begin(PGconn *conn); +static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime); static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, @@ -1315,35 +1315,32 @@ pgfdw_cancel_query(PGconn *conn) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), CONNECTION_CLEANUP_TIMEOUT); - if (!pgfdw_cancel_query_begin(conn)) + if (!pgfdw_cancel_query_begin(conn, endtime)) return false; return pgfdw_cancel_query_end(conn, endtime, false); } +/* + * Submit a cancel request to the given connection, waiting only until + * the given time. + * + * We sleep interruptibly until we receive confirmation that the cancel + * request has been accepted, and if it is, return true; if the timeout + * lapses without that, or the request fails for whatever reason, return + * false. + */ static bool -pgfdw_cancel_query_begin(PGconn *conn) +pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime) { - PGcancel *cancel; - char errbuf[256]; + char *error = libpqsrv_cancel(conn, endtime); - /* - * Issue cancel request. Unfortunately, there's no good way to limit the - * amount of time that we might block inside PQgetCancel(). - */ - if ((cancel = PQgetCancel(conn))) + if (error) { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - { - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - return false; - } - PQfreeCancel(cancel); + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", error))); + return false; } - return true; } @@ -1685,7 +1682,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, */ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) { - if (!pgfdw_cancel_query_begin(entry->conn)) + TimestampTz endtime; + + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + if (!pgfdw_cancel_query_begin(entry->conn, endtime)) return false; /* Unable to cancel running query */ *cancel_requested = lappend(*cancel_requested, entry); } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index acbbf3b56c8..98d16b1fd8c 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c (10 rows) ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)) + Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE)) +(4 rows) + +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +ERROR: canceling statement due to statement timeout +RESET statement_timeout; -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e3d147de6da..2626e68cc69 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +RESET statement_timeout; + -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 5d33bcf32f7..33c7be7845c 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -44,6 +44,8 @@ #include "miscadmin.h" #include "storage/fd.h" #include "storage/latch.h" +#include "utils/timestamp.h" +#include "utils/wait_event.h" static inline void libpqsrv_connect_prepare(void); @@ -365,4 +367,86 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) return PQgetResult(conn); } +/* + * Submit a cancel request to the given connection, waiting only until + * the given time. + * + * We sleep interruptibly until we receive confirmation that the cancel + * request has been accepted, and if it is, return NULL; if the timeout + * lapses without that, or the request fails for whatever reason, return + * the error message. + */ +static char * +libpqsrv_cancel(PGconn *conn, TimestampTz endtime) +{ + char *error = NULL; + PGcancelConn *cancel_conn = PQcancelCreate(conn); + + if (!PQcancelStart(cancel_conn)) + { + PG_TRY(); + { + error = pchomp(PQcancelErrorMessage(cancel_conn)); + } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + return error; + } + + /* In what follows, do not leak any PGcancelConn on an error. */ + PG_TRY(); + { + while (true) + { + PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn); + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; + int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + if (pollres == PGRES_POLLING_OK) + break; + + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + error = "timed out"; + break; + } + + switch (pollres) + { + case PGRES_POLLING_READING: + waitEvents |= WL_SOCKET_READABLE; + break; + case PGRES_POLLING_WRITING: + waitEvents |= WL_SOCKET_WRITEABLE; + break; + default: + error = pchomp(PQcancelErrorMessage(cancel_conn)); + goto exit; + } + + /* Sleep until there's something to do */ + WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } +exit: ; + } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + + return error; +} + + #endif /* LIBPQ_BE_FE_HELPERS_H */ base-commit: b4080fa3dcf6c6359e542169e0e81a0662c53ba8 -- 2.34.1