From 92fd2396d8e4dc68b1030802a7c549f60d9554ac Mon Sep 17 00:00:00 2001 From: Julien Rouhaud Date: Fri, 28 Jun 2019 13:11:29 +0200 Subject: [PATCH 1/4] Export vacuumdb's parallel infrastructure --- src/bin/scripts/Makefile | 4 +- src/bin/scripts/parallel.c | 282 ++++++++++++++++++++++++++++++++++++ src/bin/scripts/parallel.h | 34 +++++ src/bin/scripts/vacuumdb.c | 284 +------------------------------------ 4 files changed, 319 insertions(+), 285 deletions(-) create mode 100644 src/bin/scripts/parallel.c create mode 100644 src/bin/scripts/parallel.h diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile index 9f352b5e2b..6979d8f9ff 100644 --- a/src/bin/scripts/Makefile +++ b/src/bin/scripts/Makefile @@ -28,7 +28,7 @@ createuser: createuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils -vacuumdb: vacuumdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils +vacuumdb: vacuumdb.o common.o parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils @@ -50,7 +50,7 @@ uninstall: clean distclean maintainer-clean: rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS)) - rm -f common.o $(WIN32RES) + rm -f common.o parallel.o $(WIN32RES) rm -rf tmp_check check: diff --git a/src/bin/scripts/parallel.c b/src/bin/scripts/parallel.c new file mode 100644 index 0000000000..5e4505e9fe --- /dev/null +++ b/src/bin/scripts/parallel.c @@ -0,0 +1,282 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * Parallel support for bin/scripts/ + * + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/bin/scripts/parallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#ifdef HAVE_SYS_SELECT_H +#include +#endif + +#include "common.h" +#include "common/logging.h" +#include "parallel.h" + +#define ERRCODE_UNDEFINED_TABLE "42P01" + + +/* + * GetIdleSlot + * Return a connection slot that is ready to execute a command. + * + * We return the first slot we find that is marked isFree, if one is; + * otherwise, we loop on select() until one socket becomes available. When + * this happens, we read the whole set and mark as free all sockets that become + * available. + * + * If an error occurs, NULL is returned. + */ +ParallelSlot * +GetIdleSlot(ParallelSlot slots[], int numslots, + const char *progname) +{ + int i; + int firstFree = -1; + + /* Any connection already known free? */ + for (i = 0; i < numslots; i++) + { + if (slots[i].isFree) + return slots + i; + } + + /* + * No free slot found, so wait until one of the connections has finished + * its task and return the available slot. + */ + while (firstFree < 0) + { + fd_set slotset; + int maxFd = 0; + bool aborting; + + /* We must reconstruct the fd_set for each call to select_loop */ + FD_ZERO(&slotset); + + for (i = 0; i < numslots; i++) + { + int sock = PQsocket(slots[i].connection); + + /* + * We don't really expect any connections to lose their sockets + * after startup, but just in case, cope by ignoring them. + */ + if (sock < 0) + continue; + + FD_SET(sock, &slotset); + if (sock > maxFd) + maxFd = sock; + } + + SetCancelConn(slots->connection); + i = select_loop(maxFd, &slotset, &aborting); + ResetCancelConn(); + + if (aborting) + { + /* + * We set the cancel-receiving connection to the one in the zeroth + * slot above, so fetch the error from there. + */ + GetQueryResult(slots->connection, progname); + return NULL; + } + Assert(i != 0); + + for (i = 0; i < numslots; i++) + { + int sock = PQsocket(slots[i].connection); + + if (sock >= 0 && FD_ISSET(sock, &slotset)) + { + /* select() says input is available, so consume it */ + PQconsumeInput(slots[i].connection); + } + + /* Collect result(s) as long as any are available */ + while (!PQisBusy(slots[i].connection)) + { + PGresult *result = PQgetResult(slots[i].connection); + + if (result != NULL) + { + /* Check and discard the command result */ + if (!ProcessQueryResult(slots[i].connection, result, + progname)) + return NULL; + } + else + { + /* This connection has become idle */ + slots[i].isFree = true; + if (firstFree < 0) + firstFree = i; + break; + } + } + } + } + + return slots + firstFree; +} + +/* + * ProcessQueryResult + * + * Process (and delete) a query result. Returns true if there's no error, + * false otherwise -- but errors about trying to vacuum a missing relation + * are reported and subsequently ignored. + */ +bool +ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname) +{ + /* + * If it's an error, report it. Errors about a missing table are harmless + * so we continue processing; but die for other errors. + */ + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); + + pg_log_error("processing of database \"%s\" failed: %s", + PQdb(conn), PQerrorMessage(conn)); + + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) + { + PQclear(result); + return false; + } + } + + PQclear(result); + return true; +} + +/* + * GetQueryResult + * + * Pump the conn till it's dry of results; return false if any are errors. + * Note that this will block if the conn is busy. + */ +bool +GetQueryResult(PGconn *conn, const char *progname) +{ + bool ok = true; + PGresult *result; + + SetCancelConn(conn); + while ((result = PQgetResult(conn)) != NULL) + { + if (!ProcessQueryResult(conn, result, progname)) + ok = false; + } + ResetCancelConn(); + return ok; +} + +/* + * DisconnectDatabase + * Disconnect the connection associated with the given slot + */ +void +DisconnectDatabase(ParallelSlot *slot) +{ + char errbuf[256]; + + if (!slot->connection) + return; + + if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE) + { + PGcancel *cancel; + + if ((cancel = PQgetCancel(slot->connection))) + { + (void) PQcancel(cancel, errbuf, sizeof(errbuf)); + PQfreeCancel(cancel); + } + } + + PQfinish(slot->connection); + slot->connection = NULL; +} + +/* + * Loop on select() until a descriptor from the given set becomes readable. + * + * If we get a cancel request while we're waiting, we forego all further + * processing and set the *aborting flag to true. The return value must be + * ignored in this case. Otherwise, *aborting is set to false. + */ +int +select_loop(int maxFd, fd_set *workerset, bool *aborting) +{ + int i; + fd_set saveSet = *workerset; + + if (CancelRequested) + { + *aborting = true; + return -1; + } + else + *aborting = false; + + for (;;) + { + /* + * On Windows, we need to check once in a while for cancel requests; + * on other platforms we rely on select() returning when interrupted. + */ + struct timeval *tvp; +#ifdef WIN32 + struct timeval tv = {0, 1000000}; + + tvp = &tv; +#else + tvp = NULL; +#endif + + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, tvp); + +#ifdef WIN32 + if (i == SOCKET_ERROR) + { + i = -1; + + if (WSAGetLastError() == WSAEINTR) + errno = EINTR; + } +#endif + + if (i < 0 && errno == EINTR) + continue; /* ignore this */ + if (i < 0 || CancelRequested) + *aborting = true; /* but not this */ + if (i == 0) + continue; /* timeout (Win32 only) */ + break; + } + + return i; +} + +void +init_slot(ParallelSlot *slot, PGconn *conn) +{ + slot->connection = conn; + /* Initially assume connection is idle */ + slot->isFree = true; +} diff --git a/src/bin/scripts/parallel.h b/src/bin/scripts/parallel.h new file mode 100644 index 0000000000..be58e0bb96 --- /dev/null +++ b/src/bin/scripts/parallel.h @@ -0,0 +1,34 @@ +/* + * parallel.h + * Parallel support for bin/scripts/ + * + * Copyright (c) 2003-2019, PostgreSQL Global Development Group + * + * src/bin/scripts/parallel.h + */ +#ifndef SCRIPTS_PARALLEL_H +#define SCRIPTS_PARALLEL_H + +/* Parallel processing stuff */ +typedef struct ParallelSlot +{ + PGconn *connection; /* One connection */ + bool isFree; /* Is it known to be idle? */ +} ParallelSlot; + +extern ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots, + const char *progname); + +extern bool ProcessQueryResult(PGconn *conn, PGresult *result, + const char *progname); + +extern bool GetQueryResult(PGconn *conn, const char *progname); + +extern void DisconnectDatabase(ParallelSlot *slot); + +extern int select_loop(int maxFd, fd_set *workerset, bool *aborting); + +extern void init_slot(ParallelSlot *slot, PGconn *conn); + + +#endif /* SCRIPTS_PARALLEL_H */ diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index df2a315f95..80c9341a5b 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -12,10 +12,6 @@ #include "postgres_fe.h" -#ifdef HAVE_SYS_SELECT_H -#include -#endif - #include "catalog/pg_class_d.h" #include "common.h" @@ -23,17 +19,9 @@ #include "fe_utils/connect.h" #include "fe_utils/simple_list.h" #include "fe_utils/string_utils.h" +#include "parallel.h" -#define ERRCODE_UNDEFINED_TABLE "42P01" - -/* Parallel vacuuming stuff */ -typedef struct ParallelSlot -{ - PGconn *connection; /* One connection */ - bool isFree; /* Is it known to be idle? */ -} ParallelSlot; - /* vacuum options controlled by user flags */ typedef struct vacuumingOptions { @@ -71,20 +59,6 @@ static void prepare_vacuum_command(PQExpBuffer sql, int serverVersion, static void run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *table, const char *progname, bool async); -static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots, - const char *progname); - -static bool ProcessQueryResult(PGconn *conn, PGresult *result, - const char *progname); - -static bool GetQueryResult(PGconn *conn, const char *progname); - -static void DisconnectDatabase(ParallelSlot *slot); - -static int select_loop(int maxFd, fd_set *workerset, bool *aborting); - -static void init_slot(ParallelSlot *slot, PGconn *conn); - static void help(const char *progname); /* For analyze-in-stages mode */ @@ -953,262 +927,6 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo, } } -/* - * GetIdleSlot - * Return a connection slot that is ready to execute a command. - * - * We return the first slot we find that is marked isFree, if one is; - * otherwise, we loop on select() until one socket becomes available. When - * this happens, we read the whole set and mark as free all sockets that become - * available. - * - * If an error occurs, NULL is returned. - */ -static ParallelSlot * -GetIdleSlot(ParallelSlot slots[], int numslots, - const char *progname) -{ - int i; - int firstFree = -1; - - /* Any connection already known free? */ - for (i = 0; i < numslots; i++) - { - if (slots[i].isFree) - return slots + i; - } - - /* - * No free slot found, so wait until one of the connections has finished - * its task and return the available slot. - */ - while (firstFree < 0) - { - fd_set slotset; - int maxFd = 0; - bool aborting; - - /* We must reconstruct the fd_set for each call to select_loop */ - FD_ZERO(&slotset); - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - /* - * We don't really expect any connections to lose their sockets - * after startup, but just in case, cope by ignoring them. - */ - if (sock < 0) - continue; - - FD_SET(sock, &slotset); - if (sock > maxFd) - maxFd = sock; - } - - SetCancelConn(slots->connection); - i = select_loop(maxFd, &slotset, &aborting); - ResetCancelConn(); - - if (aborting) - { - /* - * We set the cancel-receiving connection to the one in the zeroth - * slot above, so fetch the error from there. - */ - GetQueryResult(slots->connection, progname); - return NULL; - } - Assert(i != 0); - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - if (sock >= 0 && FD_ISSET(sock, &slotset)) - { - /* select() says input is available, so consume it */ - PQconsumeInput(slots[i].connection); - } - - /* Collect result(s) as long as any are available */ - while (!PQisBusy(slots[i].connection)) - { - PGresult *result = PQgetResult(slots[i].connection); - - if (result != NULL) - { - /* Check and discard the command result */ - if (!ProcessQueryResult(slots[i].connection, result, - progname)) - return NULL; - } - else - { - /* This connection has become idle */ - slots[i].isFree = true; - if (firstFree < 0) - firstFree = i; - break; - } - } - } - } - - return slots + firstFree; -} - -/* - * ProcessQueryResult - * - * Process (and delete) a query result. Returns true if there's no error, - * false otherwise -- but errors about trying to vacuum a missing relation - * are reported and subsequently ignored. - */ -static bool -ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname) -{ - /* - * If it's an error, report it. Errors about a missing table are harmless - * so we continue processing; but die for other errors. - */ - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); - - pg_log_error("vacuuming of database \"%s\" failed: %s", - PQdb(conn), PQerrorMessage(conn)); - - if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) - { - PQclear(result); - return false; - } - } - - PQclear(result); - return true; -} - -/* - * GetQueryResult - * - * Pump the conn till it's dry of results; return false if any are errors. - * Note that this will block if the conn is busy. - */ -static bool -GetQueryResult(PGconn *conn, const char *progname) -{ - bool ok = true; - PGresult *result; - - SetCancelConn(conn); - while ((result = PQgetResult(conn)) != NULL) - { - if (!ProcessQueryResult(conn, result, progname)) - ok = false; - } - ResetCancelConn(); - return ok; -} - -/* - * DisconnectDatabase - * Disconnect the connection associated with the given slot - */ -static void -DisconnectDatabase(ParallelSlot *slot) -{ - char errbuf[256]; - - if (!slot->connection) - return; - - if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE) - { - PGcancel *cancel; - - if ((cancel = PQgetCancel(slot->connection))) - { - (void) PQcancel(cancel, errbuf, sizeof(errbuf)); - PQfreeCancel(cancel); - } - } - - PQfinish(slot->connection); - slot->connection = NULL; -} - -/* - * Loop on select() until a descriptor from the given set becomes readable. - * - * If we get a cancel request while we're waiting, we forego all further - * processing and set the *aborting flag to true. The return value must be - * ignored in this case. Otherwise, *aborting is set to false. - */ -static int -select_loop(int maxFd, fd_set *workerset, bool *aborting) -{ - int i; - fd_set saveSet = *workerset; - - if (CancelRequested) - { - *aborting = true; - return -1; - } - else - *aborting = false; - - for (;;) - { - /* - * On Windows, we need to check once in a while for cancel requests; - * on other platforms we rely on select() returning when interrupted. - */ - struct timeval *tvp; -#ifdef WIN32 - struct timeval tv = {0, 1000000}; - - tvp = &tv; -#else - tvp = NULL; -#endif - - *workerset = saveSet; - i = select(maxFd + 1, workerset, NULL, NULL, tvp); - -#ifdef WIN32 - if (i == SOCKET_ERROR) - { - i = -1; - - if (WSAGetLastError() == WSAEINTR) - errno = EINTR; - } -#endif - - if (i < 0 && errno == EINTR) - continue; /* ignore this */ - if (i < 0 || CancelRequested) - *aborting = true; /* but not this */ - if (i == 0) - continue; /* timeout (Win32 only) */ - break; - } - - return i; -} - -static void -init_slot(ParallelSlot *slot, PGconn *conn) -{ - slot->connection = conn; - /* Initially assume connection is idle */ - slot->isFree = true; -} - static void help(const char *progname) { -- 2.20.1