From 187ac78b911c7f1c55da582f38fe4cd1ec1318a8 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 16 Jun 2023 17:15:48 +0200 Subject: [PATCH v6 2/2] Support sending Close messages from libpq This part of the protocol did not have a libpq implementation yet. That is a shame because connection poolers can much more easily intercept these Close messages than a DEALLOCATE query. Odyssey has prepared statement support implemented using the Close message, and PgBouncer is currently trying to do the same. But libpq based clients are not able to use this feature of these connection poolers because they cannot send the Close protocol message. --- doc/src/sgml/libpq.sgml | 125 ++++++++++++++++-- src/interfaces/libpq/exports.txt | 4 + src/interfaces/libpq/fe-exec.c | 123 ++++++++++++++--- src/interfaces/libpq/fe-protocol3.c | 19 ++- src/interfaces/libpq/libpq-fe.h | 6 + src/interfaces/libpq/libpq-int.h | 2 +- .../modules/libpq_pipeline/libpq_pipeline.c | 67 +++++++++- .../libpq_pipeline/traces/prepared.trace | 24 ++++ 8 files changed, 337 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 2225e4e0ef3..3c17b097540 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3250,10 +3250,7 @@ PGresult *PQprepare(PGconn *conn, Prepared statements for use with can also be created by executing SQL - statements. Also, although there is no libpq - function for deleting a prepared statement, the SQL statement - can be used for that purpose. + statements. @@ -3360,6 +3357,66 @@ PGresult *PQdescribePortal(PGconn *conn, const char *portalName); + + + PQclosePreparedPQclosePrepared + + + + Submits a request to close the specified prepared statement, and waits + for completion. + +PGresult *PQclosePrepared(PGconn *conn, const char *stmtName); + + + + + allows an application to close + a previously prepared statement. Closing a statement releases all + of its associated resources on the server and allows its name to be + reused. + + + + stmtName can be "" or + NULL to reference the unnamed statement. It is fine + if no statement exists with this name, in that case the operation is a + no-op. On success, a PGresult with + status PGRES_COMMAND_OK is returned. + + + + + + PQclosePortalPQclosePortal + + + + Submits a request to close the specified portal, and waits for + completion. + +PGresult *PQclosePortal(PGconn *conn, const char *portalName); + + + + + allows an application to trigger + a close of a previously created portal. Closing a portal releases all + of its associated resources on the server and allows its name to be + reused. (libpq does not provide any + direct access to portals, but you can use this function to close a + cursor created with a DECLARE CURSOR SQL command.) + + + + portalName can be "" or + NULL to reference the unnamed portal. It is fine + if no portal exists with this name, in that case the operation is a + no-op. On success, a PGresult with status + PGRES_COMMAND_OK is returned. + + + @@ -4851,15 +4908,19 @@ unsigned char *PQunescapeBytea(const unsigned char *from, size_t *to_length); , , , - , and + , , + , and + , which can be used with to duplicate the functionality of , , , - , and + , + , and + respectively. @@ -5008,6 +5069,46 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName); + + PQsendClosePreparedPQsendClosePrepared + + + + Submits a request to close the specified prepared statement, without + waiting for completion. + +int PQsendClosePrepared(PGconn *conn, const char *stmtName); + + + This is an asynchronous version of : + it returns 1 if it was able to dispatch the request, and 0 if not. + After a successful call, call to + obtain the results. The function's parameters are handled + identically to . + + + + + + PQsendClosePortalPQsendClosePortal + + + + Submits a request to close specified portal, without waiting for + completion. + +int PQsendClosePortal(PGconn *conn, const char *portalName); + + + This is an asynchronous version of : + it returns 1 if it was able to dispatch the request, and 0 if not. + After a successful call, call to + obtain the results. The function's parameters are handled + identically to . + + + + PQgetResultPQgetResult @@ -5019,7 +5120,9 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName); , , , - , or + , + , + , or call, and returns it. A null pointer is returned when the command is complete and there @@ -5350,6 +5453,8 @@ int PQflush(PGconn *conn); PQexecPrepared, PQdescribePrepared, PQdescribePortal, + PQclosePrepared, + PQclosePortal, is an error condition. PQsendQuery is also disallowed, because it uses the simple query protocol. @@ -5389,8 +5494,10 @@ int PQflush(PGconn *conn); establish a synchronization point in the pipeline, or when is called. The functions , - , and - also work in pipeline mode. + , + , + , and + also work in pipeline mode. Result processing is described below. diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 7ded77aff37..850734ac96c 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -187,3 +187,7 @@ PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 PQconnectionUsedGSSAPI 187 +PQclosePrepared 188 +PQclosePortal 189 +PQsendClosePrepared 190 +PQsendClosePortal 191 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 14d706efd57..dd8b791764c 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -77,8 +77,8 @@ static void parseInput(PGconn *conn); static PGresult *getCopyResult(PGconn *conn, ExecStatusType copytype); static bool PQexecStart(PGconn *conn); static PGresult *PQexecFinish(PGconn *conn); -static int PQsendDescribe(PGconn *conn, char desc_type, - const char *desc_target); +static int PQsendTypedCommand(PGconn *conn, char command, char type, + const char *target); static int check_field_number(const PGresult *res, int field_num); static void pqPipelineProcessQueue(PGconn *conn); static int pqPipelineFlush(PGconn *conn); @@ -2422,7 +2422,7 @@ PQdescribePrepared(PGconn *conn, const char *stmt) { if (!PQexecStart(conn)) return NULL; - if (!PQsendDescribe(conn, 'S', stmt)) + if (!PQsendTypedCommand(conn, 'D', 'S', stmt)) return NULL; return PQexecFinish(conn); } @@ -2441,7 +2441,7 @@ PQdescribePortal(PGconn *conn, const char *portal) { if (!PQexecStart(conn)) return NULL; - if (!PQsendDescribe(conn, 'P', portal)) + if (!PQsendTypedCommand(conn, 'D', 'P', portal)) return NULL; return PQexecFinish(conn); } @@ -2456,7 +2456,7 @@ PQdescribePortal(PGconn *conn, const char *portal) int PQsendDescribePrepared(PGconn *conn, const char *stmt) { - return PQsendDescribe(conn, 'S', stmt); + return PQsendTypedCommand(conn, 'D', 'S', stmt); } /* @@ -2469,26 +2469,95 @@ PQsendDescribePrepared(PGconn *conn, const char *stmt) int PQsendDescribePortal(PGconn *conn, const char *portal) { - return PQsendDescribe(conn, 'P', portal); + return PQsendTypedCommand(conn, 'D', 'P', portal); } /* - * PQsendDescribe - * Common code to send a Describe command + * PQclosePrepared + * Close a previously prepared statement * - * Available options for desc_type are - * 'S' to describe a prepared statement; or - * 'P' to describe a portal. + * If the query was not even sent, return NULL; conn->errorMessage is set to + * a relevant message. + * If the query was sent, a new PGresult is returned (which could indicate + * either success or failure). On success, the PGresult contains status + * PGRES_COMMAND_OK. The user is responsible for freeing the PGresult via + * PQclear() when done with it. + */ +PGresult * +PQclosePrepared(PGconn *conn, const char *stmt) +{ + if (!PQexecStart(conn)) + return NULL; + if (!PQsendTypedCommand(conn, 'C', 'S', stmt)) + return NULL; + return PQexecFinish(conn); +} + +/* + * PQclosePortal + * Close a previously created portal + * + * This is exactly like PQclosePrepared, but for portals. Note that at the + * moment, libpq doesn't really expose portals to the client; but this can be + * used with a portal created by a SQL DECLARE CURSOR command. + */ +PGresult * +PQclosePortal(PGconn *conn, const char *portal) +{ + if (!PQexecStart(conn)) + return NULL; + if (!PQsendTypedCommand(conn, 'C', 'P', portal)) + return NULL; + return PQexecFinish(conn); +} + +/* + * PQsendClosePrepared + * Submit a Close Statement command, but don't wait for it to finish + * + * Returns: 1 if successfully submitted + * 0 if error (conn->errorMessage is set) + */ +int +PQsendClosePrepared(PGconn *conn, const char *stmt) +{ + return PQsendTypedCommand(conn, 'C', 'S', stmt); +} + +/* + * PQsendClosePortal + * Submit a Close Portal command, but don't wait for it to finish + * + * Returns: 1 if successfully submitted + * 0 if error (conn->errorMessage is set) + */ +int +PQsendClosePortal(PGconn *conn, const char *portal) +{ + return PQsendTypedCommand(conn, 'C', 'P', portal); +} + +/* + * PQsendTypedCommand + * Common code to send a Describe or Close command + * + * Available options for command are + * 'C' for Close + * 'D' for Describe + * + * Available options for type are + * 'S' to a prepared statement; or + * 'P' to a portal. * Returns 1 on success and 0 on failure. */ static int -PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) +PQsendTypedCommand(PGconn *conn, char command, char type, const char *target) { PGcmdQueueEntry *entry = NULL; - /* Treat null desc_target as empty string */ - if (!desc_target) - desc_target = ""; + /* Treat null target as empty string */ + if (!target) + target = ""; if (!PQsendQueryStart(conn, true)) return 0; @@ -2497,10 +2566,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) if (entry == NULL) return 0; /* error msg already set */ - /* construct the Describe message */ - if (pqPutMsgStart('D', conn) < 0 || - pqPutc(desc_type, conn) < 0 || - pqPuts(desc_target, conn) < 0 || + /* construct the Close message */ + if (pqPutMsgStart(command, conn) < 0 || + pqPutc(type, conn) < 0 || + pqPuts(target, conn) < 0 || pqPutMsgEnd(conn) < 0) goto sendFailed; @@ -2512,8 +2581,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; } - /* remember we are doing a Describe */ - entry->queryclass = PGQUERY_DESCRIBE; + /* remember if we are doing a Close or a Describe */ + if (command == 'C') + { + entry->queryclass = PGQUERY_CLOSE; + } + else if (command == 'D') + { + entry->queryclass = PGQUERY_DESCRIBE; + } + else + { + libpq_append_conn_error(conn, "unknown command type provided"); + goto sendFailed; + } /* * Give the data a push (in pipeline mode, only if we're past the size diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 32b66d561cb..7bc6355d17f 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -278,8 +278,25 @@ pqParseInput3(PGconn *conn) } break; case '2': /* Bind Complete */ + /* Nothing to do for this message type */ + break; case '3': /* Close Complete */ - /* Nothing to do for these message types */ + /* If we're doing PQsendClose, we're done; else ignore */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_CLOSE) + { + if (!pgHavePendingResult(conn)) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_COMMAND_OK); + if (!conn->result) + { + libpq_append_conn_error(conn, "out of memory"); + pqSaveErrorResult(conn); + } + } + conn->asyncStatus = PGASYNC_READY; + } break; case 'S': /* parameter status */ if (getParameterStatus(conn)) diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 7476dbe0e90..97762d56f5d 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -548,6 +548,12 @@ extern PGresult *PQdescribePortal(PGconn *conn, const char *portal); extern int PQsendDescribePrepared(PGconn *conn, const char *stmt); extern int PQsendDescribePortal(PGconn *conn, const char *portal); +/* Close prepared statements and portals */ +extern PGresult *PQclosePrepared(PGconn *conn, const char *stmt); +extern PGresult *PQclosePortal(PGconn *conn, const char *portal); +extern int PQsendClosePrepared(PGconn *conn, const char *stmt); +extern int PQsendClosePortal(PGconn *conn, const char *portal); + /* Delete a PGresult */ extern void PQclear(PGresult *res); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 0045f83cbfd..cd116f4e95c 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -324,7 +324,7 @@ typedef enum PGQUERY_PREPARE, /* Parse only (PQprepare) */ PGQUERY_DESCRIBE, /* Describe Statement or Portal */ PGQUERY_SYNC, /* Sync (at end of a pipeline) */ - PGQUERY_CLOSE + PGQUERY_CLOSE /* Close Statement or Portal */ } PGQueryClass; /* diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 1933b078ebf..a2f5c8c0027 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -896,7 +896,7 @@ test_prepared(PGconn *conn) Oid expected_oids[4]; Oid typ; - fprintf(stderr, "prepared... "); + fprintf(stderr, "preparing statement... "); if (PQenterPipelineMode(conn) != 1) pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); @@ -947,9 +947,42 @@ test_prepared(PGconn *conn) if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res))); + fprintf(stderr, "closing statement.."); + if (PQsendClosePrepared(conn, "select_one") != 1) + pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("expected non-NULL result"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + PQclear(res); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res))); + if (PQexitPipelineMode(conn) != 1) pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn)); + /* Now that it's closed we should get an error when describing */ + res = PQdescribePrepared(conn, "select_one"); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res))); + + /* + * Also test the blocking close, this should not fail since closing a + * non-existent prepared statement is a no-op + */ + res = PQclosePrepared(conn, "select_one"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + + fprintf(stderr, "creating portal... "); PQexec(conn, "BEGIN"); PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1"); PQenterPipelineMode(conn); @@ -975,9 +1008,41 @@ test_prepared(PGconn *conn) if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res))); + fprintf(stderr, "closing portal... "); + if (PQsendClosePortal(conn, "cursor_one") != 1) + pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn)); + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("expected non-NULL result"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + PQclear(res); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res))); + if (PQexitPipelineMode(conn) != 1) pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn)); + /* Now that it's closed we should get an error when describing */ + res = PQdescribePortal(conn, "cursor_one"); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res))); + + /* + * Also test the blocking close, this should not fail since closing a + * non-existent portal is a no-op + */ + res = PQclosePortal(conn, "cursor_one"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res))); + fprintf(stderr, "ok\n"); } diff --git a/src/test/modules/libpq_pipeline/traces/prepared.trace b/src/test/modules/libpq_pipeline/traces/prepared.trace index 1a7de5c3e65..aeb5de109e0 100644 --- a/src/test/modules/libpq_pipeline/traces/prepared.trace +++ b/src/test/modules/libpq_pipeline/traces/prepared.trace @@ -5,6 +5,18 @@ B 4 ParseComplete B 10 ParameterDescription 1 NNNN B 113 RowDescription 4 "?column?" NNNN 0 NNNN 4 -1 0 "?column?" NNNN 0 NNNN 65535 -1 0 "numeric" NNNN 0 NNNN 65535 -1 0 "interval" NNNN 0 NNNN 16 -1 0 B 5 ReadyForQuery I +F 16 Close S "select_one" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery I +F 16 Describe S "select_one" +F 4 Sync +B NN ErrorResponse S "ERROR" V "ERROR" C "26000" M "prepared statement "select_one" does not exist" F "SSSS" L "SSSS" R "SSSS" \x00 +B 5 ReadyForQuery I +F 16 Close S "select_one" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery I F 10 Query "BEGIN" B 10 CommandComplete "BEGIN" B 5 ReadyForQuery T @@ -15,4 +27,16 @@ F 16 Describe P "cursor_one" F 4 Sync B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 B 5 ReadyForQuery T +F 16 Close P "cursor_one" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery T +F 16 Describe P "cursor_one" +F 4 Sync +B NN ErrorResponse S "ERROR" V "ERROR" C "34000" M "portal "cursor_one" does not exist" F "SSSS" L "SSSS" R "SSSS" \x00 +B 5 ReadyForQuery E +F 16 Close P "cursor_one" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery E F 4 Terminate -- 2.34.1