From 4a8fa778fe5ee1807b91d2587775b7a7ad250829 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 23 Jan 2024 11:16:23 +0200 Subject: [PATCH v3 3/4] Use libpq-be-fe-helpers.h wrappers more --- .../libpqwalreceiver/libpqwalreceiver.c | 148 ++++-------------- 1 file changed, 31 insertions(+), 117 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index db779dc6ca6..c60a121093c 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -102,8 +102,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -212,8 +210,9 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, { PGresult *res; - res = libpqrcv_PQexec(conn->streamConn, - ALWAYS_SECURE_SEARCH_PATH_SQL); + res = libpqsrv_exec(conn->streamConn, + ALWAYS_SECURE_SEARCH_PATH_SQL, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -385,7 +384,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqsrv_exec(conn->streamConn, + "IDENTIFY_SYSTEM", + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -518,7 +519,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -548,7 +551,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PGresult *res; /* - * Send copy-end message. As in libpqrcv_PQexec, this could theoretically + * Send copy-end message. As in libpqsrv_exec, this could theoretically * block, but the risk seems small. */ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || @@ -568,7 +571,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -583,7 +587,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -597,7 +602,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -608,7 +614,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -633,7 +640,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqsrv_exec(conn->streamConn, + cmd, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -663,107 +672,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, PQclear(res); } -/* - * Send a query and wait for the results by using the asynchronous libpq - * functions and socket readiness events. - * - * The function is modeled on libpqsrv_exec(), with the behavior difference - * being that it calls ProcessWalRcvInterrupts(). As an optimization, it - * skips try/catch, since all errors terminate the process. - * - * May return NULL, rather than an error result, on failure. - */ -static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) -{ - PGresult *lastResult = NULL; - - /* - * PQexec() silently discards any prior query results on the connection. - * This is not required for this function as it's expected that the caller - * (which is this library in all cases) will behave correctly and we don't - * have to be backwards compatible with old libpq. - */ - - /* - * Submit the query. Since we don't use non-blocking mode, this could - * theoretically block. In practice, since we don't send very long query - * strings, the risk seems negligible. - */ - if (!PQsendQuery(streamConn, query)) - return NULL; - - for (;;) - { - /* Wait for, and collect, the next PGresult. */ - PGresult *result; - - result = libpqrcv_PQgetResult(streamConn); - if (result == NULL) - break; /* query is complete, or failure */ - - /* - * Emulate PQexec()'s behavior of returning the last result when there - * are many. We are fine with returning just last error message. - */ - PQclear(lastResult); - lastResult = result; - - if (PQresultStatus(lastResult) == PGRES_COPY_IN || - PQresultStatus(lastResult) == PGRES_COPY_OUT || - PQresultStatus(lastResult) == PGRES_COPY_BOTH || - PQstatus(streamConn) == CONNECTION_BAD) - break; - } - - return lastResult; -} - -/* - * Perform the equivalent of PQgetResult(), but watch for interrupts. - */ -static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) -{ - /* - * Collect data until PQgetResult is ready to get the result without - * blocking. - */ - while (PQisBusy(streamConn)) - { - int rc; - - /* - * We don't need to break down the sleep into smaller increments, - * since we'll get interrupted by signals and can handle any - * interrupts here. - */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - - /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) - { - /* trouble; return NULL */ - return NULL; - } - } - - /* Now we can collect and return the next PGresult */ - return PQgetResult(streamConn); -} - /* * Disconnect connection to primary, if any. */ @@ -824,13 +732,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) { PQclear(res); @@ -972,7 +882,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1099,7 +1011,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqsrv_exec(conn->streamConn, + query, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); switch (PQresultStatus(pgres)) { -- 2.39.2