*** a/doc/src/sgml/ref/vacuumdb.sgml
--- b/doc/src/sgml/ref/vacuumdb.sgml
***************
*** 224,229 **** PostgreSQL documentation
--- 224,250 ----
+
+
+
+
+ Number of asynchronous connections to perform the operation. This option
+ will enable the vacuum operation to run on asynchronous connections,
+ at a time one table will be operated on one connection. So at one time
+ as many tables will be vacuumed parallely as number of jobs.
+ If number of jobs given are more than number of tables then number of
+ jobs will be set to number of tables.
+
+ vacuumdb> will open
+ njobs connections to the database, so make sure your
+ setting is high enough to
+ accommodate all connections.
+
+
+
+
+
+
*** a/src/bin/scripts/vacuumdb.c
--- b/src/bin/scripts/vacuumdb.c
***************
*** 14,19 ****
--- 14,29 ----
#include "common.h"
#include "dumputils.h"
+ #define NO_SLOT (-1)
+
+ /* Arguments needed for a worker process */
+ typedef struct ParallelSlot
+ {
+ PGconn *connection;
+ bool isFree;
+ pgsocket sock;
+ } ParallelSlot;
+
static void vacuum_one_database(const char *dbname, bool full, bool verbose,
bool and_analyze, bool analyze_only, bool analyze_in_stages, bool freeze,
***************
*** 25,34 **** static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet);
static void help(const char *progname);
int
main(int argc, char *argv[])
--- 35,73 ----
const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet,
! int numAsyncCons);
static void help(const char *progname);
+ void vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ bool freeze, const char *host,
+ const char *port, const char *username,
+ enum trivalue prompt_password, const char *progname,
+ bool echo, int numAsyncCons, SimpleStringList *tables);
+
+
+ void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql);
+ static void
+ run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int numAsyncCons,
+ const char *progname, int analyze_stage,
+ ParallelSlot *connSlot, bool completedb);
+ static int
+ GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+ const char *progname, bool completedb);
+
+ static bool GetQueryResult(PGconn *conn, const char *dbname,
+ const char *progname, bool completedb);
+
+ static int
+ select_loop(int maxFd, fd_set *workerset);
+
+ static void DisconnectDatabase(ParallelSlot *slot);
+
int
main(int argc, char *argv[])
***************
*** 49,54 **** main(int argc, char *argv[])
--- 88,94 ----
{"table", required_argument, NULL, 't'},
{"full", no_argument, NULL, 'f'},
{"verbose", no_argument, NULL, 'v'},
+ {"jobs", required_argument, NULL, 'j'},
{"maintenance-db", required_argument, NULL, 2},
{"analyze-in-stages", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
***************
*** 74,86 **** main(int argc, char *argv[])
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
{
switch (c)
{
--- 114,128 ----
bool full = false;
bool verbose = false;
SimpleStringList tables = {NULL, NULL};
+ int numAsyncCons = 0;
+ int tbl_count = 0;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
handle_help_version_opts(argc, argv, "vacuumdb", help);
! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv:j:", long_options, &optindex)) != -1)
{
switch (c)
{
***************
*** 121,134 **** main(int argc, char *argv[])
--- 163,189 ----
alldb = true;
break;
case 't':
+ {
simple_string_list_append(&tables, optarg);
+ tbl_count++;
break;
+ }
case 'f':
full = true;
break;
case 'v':
verbose = true;
break;
+ case 'j':
+ numAsyncCons = atoi(optarg);
+ if (numAsyncCons <= 0)
+ {
+ fprintf(stderr, _("%s: Number of parallel \"jobs\" should be at least 1\n"),
+ progname);
+ exit(1);
+ }
+
+ break;
case 2:
maintenance_db = pg_strdup(optarg);
break;
***************
*** 141,146 **** main(int argc, char *argv[])
--- 196,202 ----
}
}
+ optind++;
/*
* Non-option argument specifies database name as long as it wasn't
***************
*** 179,184 **** main(int argc, char *argv[])
--- 235,247 ----
setup_cancel_handler();
+ /*
+ * When user is giving the table list, and list is smaller then
+ * number of tables
+ */
+ if (tbl_count && (numAsyncCons > tbl_count))
+ numAsyncCons = tbl_count;
+
if (alldb)
{
if (dbname)
***************
*** 196,202 **** main(int argc, char *argv[])
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
! prompt_password, progname, echo, quiet);
}
else
{
--- 259,265 ----
vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
maintenance_db, host, port, username,
! prompt_password, progname, echo, quiet, numAsyncCons);
}
else
{
***************
*** 210,234 **** main(int argc, char *argv[])
dbname = get_user_name_or_exit(progname);
}
! if (tables.head != NULL)
{
! SimpleStringListCell *cell;
! for (cell = tables.head; cell; cell = cell->next)
{
vacuum_one_database(dbname, full, verbose, and_analyze,
analyze_only, analyze_in_stages,
! freeze, cell->val,
host, port, username, prompt_password,
progname, echo);
}
}
- else
- vacuum_one_database(dbname, full, verbose, and_analyze,
- analyze_only, analyze_in_stages,
- freeze, NULL,
- host, port, username, prompt_password,
- progname, echo);
}
exit(0);
--- 273,311 ----
dbname = get_user_name_or_exit(progname);
}
! if (numAsyncCons > 1)
{
! vacuum_parallel(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, host, port, username, prompt_password,
! progname, echo, numAsyncCons, &tables);
! }
! else
! {
! if (tables.head != NULL)
! {
! SimpleStringListCell *cell;
!
! for (cell = tables.head; cell; cell = cell->next)
! {
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, cell->val,
! host, port, username, prompt_password,
! progname, echo);
! }
! }
! else
{
vacuum_one_database(dbname, full, verbose, and_analyze,
analyze_only, analyze_in_stages,
! freeze, NULL,
host, port, username, prompt_password,
progname, echo);
+
}
}
}
exit(0);
***************
*** 253,263 **** run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname,
static void
! vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze,
! bool analyze_only, bool analyze_in_stages, bool freeze, const char *table,
! const char *host, const char *port,
! const char *username, enum trivalue prompt_password,
! const char *progname, bool echo)
{
PQExpBufferData sql;
--- 330,341 ----
static void
! vacuum_one_database(const char *dbname, bool full, bool verbose,
! bool and_analyze, bool analyze_only, bool analyze_in_stages,
! bool freeze, const char *table, const char *host,
! const char *port, const char *username,
! enum trivalue prompt_password, const char *progname,
! bool echo)
{
PQExpBufferData sql;
***************
*** 352,362 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz
static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
! bool analyze_in_stages, bool freeze, const char *maintenance_db,
! const char *host, const char *port,
! const char *username, enum trivalue prompt_password,
! const char *progname, bool echo, bool quiet)
{
PGconn *conn;
PGresult *result;
--- 430,441 ----
static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze,
! bool analyze_only, bool analyze_in_stages, bool freeze,
! const char *maintenance_db, const char *host,
! const char *port, const char *username,
! enum trivalue prompt_password, const char *progname,
! bool echo, bool quiet, int numAsyncCons)
{
PGconn *conn;
PGresult *result;
***************
*** 377,391 **** vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl
fflush(stdout);
}
! vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
! analyze_in_stages,
! freeze, NULL, host, port, username, prompt_password,
progname, echo);
}
PQclear(result);
}
static void
help(const char *progname)
--- 456,913 ----
fflush(stdout);
}
!
! if (numAsyncCons > 1)
! {
! vacuum_parallel(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, host, port, username, prompt_password,
! progname, echo, numAsyncCons, NULL);
!
! }
! else
! {
! vacuum_one_database(dbname, full, verbose, and_analyze,
! analyze_only, analyze_in_stages,
! freeze, NULL, host, port, username, prompt_password,
progname, echo);
+ }
}
PQclear(result);
}
+ /*
+ * run_parallel_vacuum
+ * This function process the table list,
+ * pick the object on by one and get the Free connections slot, once it
+ * get the free slot send the job on the free connection.
+ */
+
+ static void
+ run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables,
+ bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, int numAsyncCons,
+ const char *progname, int analyze_stage,
+ ParallelSlot *connSlot, bool completedb)
+ {
+ PQExpBufferData sql;
+ SimpleStringListCell *cell;
+ int max_slot = numAsyncCons;
+ int i;
+ int free_slot;
+ PGconn *slotconn;
+ bool error = false;
+ const char *stage_commands[] = {
+ "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+ "SET default_statistics_target=10; RESET vacuum_cost_delay;",
+ "RESET default_statistics_target;"};
+
+ initPQExpBuffer(&sql);
+
+ if (analyze_stage >= 0)
+ {
+ for (i = 0; i < max_slot; i++)
+ {
+ executeCommand(connSlot[i].connection,
+ stage_commands[analyze_stage], progname, echo);
+ }
+ }
+
+ for (cell = tables->head; cell; cell = cell->next)
+ {
+ /*
+ * This will give the free connection slot, if no slot is free it will
+ * wait for atleast one slot to get free.
+ */
+ free_slot = GetIdleSlot(connSlot, max_slot, dbname, progname,
+ completedb);
+ if (free_slot == NO_SLOT)
+ {
+ error = true;
+ goto fail;
+ }
+
+ prepare_command(connSlot[free_slot].connection, full, verbose,
+ and_analyze, analyze_only, freeze, &sql);
+
+ appendPQExpBuffer(&sql, " %s", cell->val);
+
+ connSlot[free_slot].isFree = false;
+
+ slotconn = connSlot[free_slot].connection;
+ PQsendQuery(slotconn, sql.data);
+
+ resetPQExpBuffer(&sql);
+ }
+
+
+ for (i = 0; i < max_slot; i++)
+ {
+ /* wait for all connection to return the results*/
+ if (!GetQueryResult(connSlot[i].connection, dbname, progname,
+ completedb))
+ {
+ error = true;
+ goto fail;
+ }
+
+ connSlot[i].isFree = true;
+ }
+
+ fail:
+
+ termPQExpBuffer(&sql);
+
+ if (error)
+ {
+ for (i = 0; i < max_slot; i++)
+ {
+ DisconnectDatabase(&connSlot[i]);
+ }
+
+ exit(1);
+ }
+ }
+
+ /*
+ * GetIdleSlot
+ * Process the slot list, if any free slot available return the slotid
+ * If no slot is free, Then perform select on all the socket and wait until
+ * atleast one slot is free.
+ */
+ static int
+ GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname,
+ const char *progname, bool completedb)
+ {
+ int i;
+ fd_set slotset;
+ int firstFree = -1;
+ pgsocket maxFd;
+
+ for (i = 0; i < max_slot; i++)
+ if (pSlot[i].isFree)
+ return i;
+
+ FD_ZERO(&slotset);
+
+ maxFd = pSlot[0].sock;
+
+ for (i = 0; i < max_slot; i++)
+ {
+ FD_SET(pSlot[i].sock, &slotset);
+ if (pSlot[i].sock > maxFd)
+ maxFd = pSlot[i].sock;
+ }
+
+ /*
+ * Some of the slot are free, Process the results for slots whichever
+ * are free
+ */
+ do
+ {
+ i = select_loop(maxFd, &slotset);
+ Assert(i != 0);
+
+ for (i = 0; i < max_slot; i++)
+ {
+ if (!FD_ISSET(pSlot[i].sock, &slotset))
+ continue;
+
+ PQconsumeInput(pSlot[i].connection);
+ if (PQisBusy(pSlot[i].connection))
+ continue;
+
+ pSlot[i].isFree = true;
+
+ if (!GetQueryResult(pSlot[i].connection, dbname, progname,
+ completedb))
+ return NO_SLOT;
+
+ if (firstFree < 0)
+ firstFree = i;
+ }
+ }while(firstFree < 0);
+
+ return firstFree;
+ }
+
+ /*
+ * GetQueryResult
+ * Process the query result.
+ */
+
+ static bool GetQueryResult(PGconn *conn, const char *dbname,
+ const char *progname, bool completedb)
+ {
+ PGresult *result = NULL;
+ PGresult *lastResult = NULL;
+ bool r;
+
+ while((result = PQgetResult(conn)) != NULL)
+ lastResult = result;
+
+ if (!lastResult)
+ return true;
+
+ r = (PQresultStatus(lastResult) == PGRES_COMMAND_OK);
+
+ PQclear(lastResult);
+
+ /*
+ * If user has not given the vacuum of complete db, then if
+ * any of the object vacuum failed it can be ignored and vacuuming
+ * of other object can be continued, this is the same behaviour as
+ * vacuuming of complete db is handled without --jobs option
+ */
+ if (!r && !completedb)
+ {
+ fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+ progname, dbname, PQerrorMessage(conn));
+ return false;
+ }
+
+ return true;
+ }
+
+ /*
+ * vacuum_parallel
+ * This function will open the multiple asynchronous connection as
+ * suggested by used, it will derive the table list using server call
+ * if table list is not given by user and perform the vacuum call
+ */
+
+ void
+ vacuum_parallel(const char *dbname, bool full, bool verbose,
+ bool and_analyze, bool analyze_only, bool analyze_in_stages,
+ bool freeze, const char *host, const char *port,
+ const char *username, enum trivalue prompt_password,
+ const char *progname, bool echo, int numAsyncCons,
+ SimpleStringList *tables)
+ {
+
+ PGconn *conn;
+ int i;
+ ParallelSlot *connSlot;
+ bool vacuum_tables = true;
+
+ conn = connectDatabase(dbname, host, port, username,
+ prompt_password, progname, false);
+
+ /*
+ * if table list is not provided then we need to do vaccum for whole DB
+ * get the list of all tables and prpare the list
+ */
+ if (!tables || !tables->head)
+ {
+ SimpleStringList dbtables = {NULL, NULL};
+ PGresult *res;
+ int ntuple;
+ int i;
+ char *relName;
+ char *nspace;
+ PQExpBufferData sql;
+
+ initPQExpBuffer(&sql);
+
+ res = executeQuery(conn,
+ "SELECT relname, nspname FROM pg_class c, pg_namespace ns"
+ " WHERE (relkind = \'r\' or relkind = \'m\')"
+ " and c.relnamespace = ns.oid ORDER BY relpages desc",
+ progname, echo);
+
+ ntuple = PQntuples(res);
+ for (i = 0; i < ntuple; i++)
+ {
+ relName = PQgetvalue(res, i, 0);
+ nspace = PQgetvalue(res, i, 1);
+
+ appendPQExpBuffer(&sql, "\"%s\".\"%s\"", nspace, relName);
+ simple_string_list_append(&dbtables, sql.data);
+ resetPQExpBuffer(&sql);
+ }
+
+ termPQExpBuffer(&sql);
+ tables = &dbtables;
+
+ /* Vaccuming full database*/
+ vacuum_tables = false;
+
+ if (numAsyncCons > ntuple)
+ numAsyncCons = ntuple;
+ }
+
+ connSlot = (ParallelSlot*)pg_malloc(numAsyncCons * sizeof(ParallelSlot));
+ connSlot[0].connection = conn;
+ for (i = 1; i < numAsyncCons; i++)
+ {
+ connSlot[i].connection = connectDatabase(dbname, host, port, username,
+ prompt_password, progname, false);
+
+ PQsetnonblocking(connSlot[i].connection, 1);
+ connSlot[i].isFree = true;
+ connSlot[i].sock = PQsocket(connSlot[i].connection);
+ }
+
+ if (analyze_in_stages)
+ {
+ int i;
+
+ for (i = 0; i < 3; i++)
+ {
+ const char *stage_messages[] = {
+ gettext_noop("Generating minimal optimizer statistics (1 target)"),
+ gettext_noop("Generating medium optimizer statistics (10 targets)"),
+ gettext_noop("Generating default (full) optimizer statistics")};
+
+ puts(gettext(stage_messages[i]));
+
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze, numAsyncCons,
+ progname, i, connSlot, vacuum_tables);
+ }
+ }
+ else
+ {
+ run_parallel_vacuum(echo, dbname, tables, full, verbose,
+ and_analyze, analyze_only, freeze,
+ numAsyncCons, progname, -1, connSlot, vacuum_tables);
+ }
+
+ for (i = 0; i < numAsyncCons; i++)
+ {
+ PQfinish(connSlot[i].connection);
+ }
+ }
+
+ /*
+ * A select loop that repeats calling select until a descriptor in the read
+ * set becomes readable. On Windows we have to check for the termination event
+ * from time to time, on Unix we can just block forever.
+ */
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ int i;
+ fd_set saveSet = *workerset;
+
+ #ifdef WIN32
+ /* should always be the master */
+ for (;;)
+ {
+ /*
+ * sleep a quarter of a second before checking if we should terminate.
+ */
+ struct timeval tv = {0, 250000};
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+
+ if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ continue;
+ if (i)
+ break;
+ }
+ #else /* UNIX */
+
+ for (;;)
+ {
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+ if (i < 0 && errno == EINTR)
+ continue;
+ break;
+ }
+ #endif
+
+ return i;
+ }
+
+ /*
+ * DisconnectDatabase
+ * disconnect all the connections.
+ */
+ void
+ DisconnectDatabase(ParallelSlot *slot)
+ {
+ PGcancel *cancel;
+ char errbuf[1];
+
+ if (!slot->connection)
+ return;
+
+ if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
+ {
+ if ((cancel = PQgetCancel(slot->connection)))
+ {
+ PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ }
+ }
+
+ PQfinish(slot->connection);
+ slot->connection= NULL;
+ }
+
+
+
+ void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze,
+ bool analyze_only, bool freeze, PQExpBuffer sql)
+ {
+ initPQExpBuffer(sql);
+
+ if (analyze_only)
+ {
+ appendPQExpBuffer(sql, "ANALYZE");
+ if (verbose)
+ appendPQExpBuffer(sql, " VERBOSE");
+ }
+ else
+ {
+ appendPQExpBuffer(sql, "VACUUM");
+ if (PQserverVersion(conn) >= 90000)
+ {
+ const char *paren = " (";
+ const char *comma = ", ";
+ const char *sep = paren;
+
+ if (full)
+ {
+ appendPQExpBuffer(sql, "%sFULL", sep);
+ sep = comma;
+ }
+ if (freeze)
+ {
+ appendPQExpBuffer(sql, "%sFREEZE", sep);
+ sep = comma;
+ }
+ if (verbose)
+ {
+ appendPQExpBuffer(sql, "%sVERBOSE", sep);
+ sep = comma;
+ }
+ if (and_analyze)
+ {
+ appendPQExpBuffer(sql, "%sANALYZE", sep);
+ sep = comma;
+ }
+ if (sep != paren)
+ appendPQExpBuffer(sql, ")");
+ }
+ else
+ {
+ if (full)
+ appendPQExpBuffer(sql, " FULL");
+ if (freeze)
+ appendPQExpBuffer(sql, " FREEZE");
+ if (verbose)
+ appendPQExpBuffer(sql, " VERBOSE");
+ if (and_analyze)
+ appendPQExpBuffer(sql, " ANALYZE");
+ }
+ }
+ }
+
static void
help(const char *progname)
***************
*** 405,410 **** help(const char *progname)
--- 927,933 ----
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -z, --analyze update optimizer statistics\n"));
printf(_(" -Z, --analyze-only only update optimizer statistics\n"));
+ printf(_(" -j, --jobs=NUM use this many asynchronous connections to vacuum\n"));
printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
" stages for faster results\n"));
printf(_(" -?, --help show this help, then exit\n"));