*** a/doc/src/sgml/ref/vacuumdb.sgml --- b/doc/src/sgml/ref/vacuumdb.sgml *************** *** 224,229 **** PostgreSQL documentation --- 224,250 ---- + + + + + Number of asynchronous connections to perform the operation. This option + will enable the vacuum operation to run on asynchronous connections, + at a time one table will be operated on one connection. So at one time + as many tables will be vacuumed parallely as number of jobs. + If number of jobs given are more than number of tables then number of + jobs will be set to number of tables. + + vacuumdb will open + njobs connections to the database, so make sure your + setting is high enough to + accommodate all connections. + + + + + + *** a/src/bin/scripts/vacuumdb.c --- b/src/bin/scripts/vacuumdb.c *************** *** 14,19 **** --- 14,29 ---- #include "common.h" #include "dumputils.h" + #define NO_SLOT (-1) + + /* Arguments needed for a worker process */ + typedef struct ParallelSlot + { + PGconn *connection; + bool isFree; + pgsocket sock; + } ParallelSlot; + static void vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze, bool analyze_only, bool analyze_in_stages, bool freeze, *************** *** 25,34 **** static void vacuum_all_databases(bool full, bool verbose, bool and_analyze, const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, ! const char *progname, bool echo, bool quiet); static void help(const char *progname); int main(int argc, char *argv[]) --- 35,73 ---- const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, ! const char *progname, bool echo, bool quiet, ! int numAsyncCons); static void help(const char *progname); + void vacuum_parallel(const char *dbname, bool full, bool verbose, + bool and_analyze, bool analyze_only, bool analyze_in_stages, + bool freeze, const char *host, + const char *port, const char *username, + enum trivalue prompt_password, const char *progname, + bool echo, int numAsyncCons, SimpleStringList *tables); + + + void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze, + bool analyze_only, bool freeze, PQExpBuffer sql); + static void + run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables, + bool full, bool verbose, bool and_analyze, + bool analyze_only, bool freeze, int numAsyncCons, + const char *progname, int analyze_stage, + ParallelSlot *connSlot, bool completedb); + static int + GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname, + const char *progname, bool completedb); + + static bool GetQueryResult(PGconn *conn, const char *dbname, + const char *progname, bool completedb); + + static int + select_loop(int maxFd, fd_set *workerset); + + static void DisconnectDatabase(ParallelSlot *slot); + int main(int argc, char *argv[]) *************** *** 49,54 **** main(int argc, char *argv[]) --- 88,94 ---- {"table", required_argument, NULL, 't'}, {"full", no_argument, NULL, 'f'}, {"verbose", no_argument, NULL, 'v'}, + {"jobs", required_argument, NULL, 'j'}, {"maintenance-db", required_argument, NULL, 2}, {"analyze-in-stages", no_argument, NULL, 3}, {NULL, 0, NULL, 0} *************** *** 74,86 **** main(int argc, char *argv[]) bool full = false; bool verbose = false; SimpleStringList tables = {NULL, NULL}; progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts")); handle_help_version_opts(argc, argv, "vacuumdb", help); ! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1) { switch (c) { --- 114,128 ---- bool full = false; bool verbose = false; SimpleStringList tables = {NULL, NULL}; + int numAsyncCons = 0; + int tbl_count = 0; progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts")); handle_help_version_opts(argc, argv, "vacuumdb", help); ! while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv:j:", long_options, &optindex)) != -1) { switch (c) { *************** *** 121,134 **** main(int argc, char *argv[]) --- 163,189 ---- alldb = true; break; case 't': + { simple_string_list_append(&tables, optarg); + tbl_count++; break; + } case 'f': full = true; break; case 'v': verbose = true; break; + case 'j': + numAsyncCons = atoi(optarg); + if (numAsyncCons <= 0) + { + fprintf(stderr, _("%s: Number of parallel \"jobs\" should be at least 1\n"), + progname); + exit(1); + } + + break; case 2: maintenance_db = pg_strdup(optarg); break; *************** *** 141,146 **** main(int argc, char *argv[]) --- 196,202 ---- } } + optind++; /* * Non-option argument specifies database name as long as it wasn't *************** *** 179,184 **** main(int argc, char *argv[]) --- 235,247 ---- setup_cancel_handler(); + /* + * When user is giving the table list, and list is smaller then + * number of tables + */ + if (tbl_count && (numAsyncCons > tbl_count)) + numAsyncCons = tbl_count; + if (alldb) { if (dbname) *************** *** 196,202 **** main(int argc, char *argv[]) vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze, maintenance_db, host, port, username, ! prompt_password, progname, echo, quiet); } else { --- 259,265 ---- vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze, maintenance_db, host, port, username, ! prompt_password, progname, echo, quiet, numAsyncCons); } else { *************** *** 210,234 **** main(int argc, char *argv[]) dbname = get_user_name_or_exit(progname); } ! if (tables.head != NULL) { ! SimpleStringListCell *cell; ! for (cell = tables.head; cell; cell = cell->next) { vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only, analyze_in_stages, ! freeze, cell->val, host, port, username, prompt_password, progname, echo); } } - else - vacuum_one_database(dbname, full, verbose, and_analyze, - analyze_only, analyze_in_stages, - freeze, NULL, - host, port, username, prompt_password, - progname, echo); } exit(0); --- 273,311 ---- dbname = get_user_name_or_exit(progname); } ! if (numAsyncCons > 1) { ! vacuum_parallel(dbname, full, verbose, and_analyze, ! analyze_only, analyze_in_stages, ! freeze, host, port, username, prompt_password, ! progname, echo, numAsyncCons, &tables); ! } ! else ! { ! if (tables.head != NULL) ! { ! SimpleStringListCell *cell; ! ! for (cell = tables.head; cell; cell = cell->next) ! { ! vacuum_one_database(dbname, full, verbose, and_analyze, ! analyze_only, analyze_in_stages, ! freeze, cell->val, ! host, port, username, prompt_password, ! progname, echo); ! } ! } ! else { vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only, analyze_in_stages, ! freeze, NULL, host, port, username, prompt_password, progname, echo); + } } } exit(0); *************** *** 253,263 **** run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname, static void ! vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze, ! bool analyze_only, bool analyze_in_stages, bool freeze, const char *table, ! const char *host, const char *port, ! const char *username, enum trivalue prompt_password, ! const char *progname, bool echo) { PQExpBufferData sql; --- 330,341 ---- static void ! vacuum_one_database(const char *dbname, bool full, bool verbose, ! bool and_analyze, bool analyze_only, bool analyze_in_stages, ! bool freeze, const char *table, const char *host, ! const char *port, const char *username, ! enum trivalue prompt_password, const char *progname, ! bool echo) { PQExpBufferData sql; *************** *** 352,362 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyz static void ! vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only, ! bool analyze_in_stages, bool freeze, const char *maintenance_db, ! const char *host, const char *port, ! const char *username, enum trivalue prompt_password, ! const char *progname, bool echo, bool quiet) { PGconn *conn; PGresult *result; --- 430,441 ---- static void ! vacuum_all_databases(bool full, bool verbose, bool and_analyze, ! bool analyze_only, bool analyze_in_stages, bool freeze, ! const char *maintenance_db, const char *host, ! const char *port, const char *username, ! enum trivalue prompt_password, const char *progname, ! bool echo, bool quiet, int numAsyncCons) { PGconn *conn; PGresult *result; *************** *** 377,391 **** vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_onl fflush(stdout); } ! vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only, ! analyze_in_stages, ! freeze, NULL, host, port, username, prompt_password, progname, echo); } PQclear(result); } static void help(const char *progname) --- 456,913 ---- fflush(stdout); } ! ! if (numAsyncCons > 1) ! { ! vacuum_parallel(dbname, full, verbose, and_analyze, ! analyze_only, analyze_in_stages, ! freeze, host, port, username, prompt_password, ! progname, echo, numAsyncCons, NULL); ! ! } ! else ! { ! vacuum_one_database(dbname, full, verbose, and_analyze, ! analyze_only, analyze_in_stages, ! freeze, NULL, host, port, username, prompt_password, progname, echo); + } } PQclear(result); } + /* + * run_parallel_vacuum + * This function process the table list, + * pick the object on by one and get the Free connections slot, once it + * get the free slot send the job on the free connection. + */ + + static void + run_parallel_vacuum(bool echo, const char *dbname, SimpleStringList *tables, + bool full, bool verbose, bool and_analyze, + bool analyze_only, bool freeze, int numAsyncCons, + const char *progname, int analyze_stage, + ParallelSlot *connSlot, bool completedb) + { + PQExpBufferData sql; + SimpleStringListCell *cell; + int max_slot = numAsyncCons; + int i; + int free_slot; + PGconn *slotconn; + bool error = false; + const char *stage_commands[] = { + "SET default_statistics_target=1; SET vacuum_cost_delay=0;", + "SET default_statistics_target=10; RESET vacuum_cost_delay;", + "RESET default_statistics_target;"}; + + initPQExpBuffer(&sql); + + if (analyze_stage >= 0) + { + for (i = 0; i < max_slot; i++) + { + executeCommand(connSlot[i].connection, + stage_commands[analyze_stage], progname, echo); + } + } + + for (cell = tables->head; cell; cell = cell->next) + { + /* + * This will give the free connection slot, if no slot is free it will + * wait for atleast one slot to get free. + */ + free_slot = GetIdleSlot(connSlot, max_slot, dbname, progname, + completedb); + if (free_slot == NO_SLOT) + { + error = true; + goto fail; + } + + prepare_command(connSlot[free_slot].connection, full, verbose, + and_analyze, analyze_only, freeze, &sql); + + appendPQExpBuffer(&sql, " %s", cell->val); + + connSlot[free_slot].isFree = false; + + slotconn = connSlot[free_slot].connection; + PQsendQuery(slotconn, sql.data); + + resetPQExpBuffer(&sql); + } + + + for (i = 0; i < max_slot; i++) + { + /* wait for all connection to return the results*/ + if (!GetQueryResult(connSlot[i].connection, dbname, progname, + completedb)) + { + error = true; + goto fail; + } + + connSlot[i].isFree = true; + } + + fail: + + termPQExpBuffer(&sql); + + if (error) + { + for (i = 0; i < max_slot; i++) + { + DisconnectDatabase(&connSlot[i]); + } + + exit(1); + } + } + + /* + * GetIdleSlot + * Process the slot list, if any free slot available return the slotid + * If no slot is free, Then perform select on all the socket and wait until + * atleast one slot is free. + */ + static int + GetIdleSlot(ParallelSlot *pSlot, int max_slot, const char *dbname, + const char *progname, bool completedb) + { + int i; + fd_set slotset; + int firstFree = -1; + pgsocket maxFd; + + for (i = 0; i < max_slot; i++) + if (pSlot[i].isFree) + return i; + + FD_ZERO(&slotset); + + maxFd = pSlot[0].sock; + + for (i = 0; i < max_slot; i++) + { + FD_SET(pSlot[i].sock, &slotset); + if (pSlot[i].sock > maxFd) + maxFd = pSlot[i].sock; + } + + /* + * Some of the slot are free, Process the results for slots whichever + * are free + */ + do + { + i = select_loop(maxFd, &slotset); + Assert(i != 0); + + for (i = 0; i < max_slot; i++) + { + if (!FD_ISSET(pSlot[i].sock, &slotset)) + continue; + + PQconsumeInput(pSlot[i].connection); + if (PQisBusy(pSlot[i].connection)) + continue; + + pSlot[i].isFree = true; + + if (!GetQueryResult(pSlot[i].connection, dbname, progname, + completedb)) + return NO_SLOT; + + if (firstFree < 0) + firstFree = i; + } + }while(firstFree < 0); + + return firstFree; + } + + /* + * GetQueryResult + * Process the query result. + */ + + static bool GetQueryResult(PGconn *conn, const char *dbname, + const char *progname, bool completedb) + { + PGresult *result = NULL; + PGresult *lastResult = NULL; + bool r; + + while((result = PQgetResult(conn)) != NULL) + lastResult = result; + + if (!lastResult) + return true; + + r = (PQresultStatus(lastResult) == PGRES_COMMAND_OK); + + PQclear(lastResult); + + /* + * If user has not given the vacuum of complete db, then if + * any of the object vacuum failed it can be ignored and vacuuming + * of other object can be continued, this is the same behaviour as + * vacuuming of complete db is handled without --jobs option + */ + if (!r && !completedb) + { + fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"), + progname, dbname, PQerrorMessage(conn)); + return false; + } + + return true; + } + + /* + * vacuum_parallel + * This function will open the multiple asynchronous connection as + * suggested by used, it will derive the table list using server call + * if table list is not given by user and perform the vacuum call + */ + + void + vacuum_parallel(const char *dbname, bool full, bool verbose, + bool and_analyze, bool analyze_only, bool analyze_in_stages, + bool freeze, const char *host, const char *port, + const char *username, enum trivalue prompt_password, + const char *progname, bool echo, int numAsyncCons, + SimpleStringList *tables) + { + + PGconn *conn; + int i; + ParallelSlot *connSlot; + bool vacuum_tables = true; + + conn = connectDatabase(dbname, host, port, username, + prompt_password, progname, false); + + /* + * if table list is not provided then we need to do vaccum for whole DB + * get the list of all tables and prpare the list + */ + if (!tables || !tables->head) + { + SimpleStringList dbtables = {NULL, NULL}; + PGresult *res; + int ntuple; + int i; + char *relName; + char *nspace; + PQExpBufferData sql; + + initPQExpBuffer(&sql); + + res = executeQuery(conn, + "SELECT relname, nspname FROM pg_class c, pg_namespace ns" + " WHERE (relkind = \'r\' or relkind = \'m\')" + " and c.relnamespace = ns.oid ORDER BY relpages desc", + progname, echo); + + ntuple = PQntuples(res); + for (i = 0; i < ntuple; i++) + { + relName = PQgetvalue(res, i, 0); + nspace = PQgetvalue(res, i, 1); + + appendPQExpBuffer(&sql, "\"%s\".\"%s\"", nspace, relName); + simple_string_list_append(&dbtables, sql.data); + resetPQExpBuffer(&sql); + } + + termPQExpBuffer(&sql); + tables = &dbtables; + + /* Vaccuming full database*/ + vacuum_tables = false; + + if (numAsyncCons > ntuple) + numAsyncCons = ntuple; + } + + connSlot = (ParallelSlot*)pg_malloc(numAsyncCons * sizeof(ParallelSlot)); + connSlot[0].connection = conn; + for (i = 1; i < numAsyncCons; i++) + { + connSlot[i].connection = connectDatabase(dbname, host, port, username, + prompt_password, progname, false); + + PQsetnonblocking(connSlot[i].connection, 1); + connSlot[i].isFree = true; + connSlot[i].sock = PQsocket(connSlot[i].connection); + } + + if (analyze_in_stages) + { + int i; + + for (i = 0; i < 3; i++) + { + const char *stage_messages[] = { + gettext_noop("Generating minimal optimizer statistics (1 target)"), + gettext_noop("Generating medium optimizer statistics (10 targets)"), + gettext_noop("Generating default (full) optimizer statistics")}; + + puts(gettext(stage_messages[i])); + + run_parallel_vacuum(echo, dbname, tables, full, verbose, + and_analyze, analyze_only, freeze, numAsyncCons, + progname, i, connSlot, vacuum_tables); + } + } + else + { + run_parallel_vacuum(echo, dbname, tables, full, verbose, + and_analyze, analyze_only, freeze, + numAsyncCons, progname, -1, connSlot, vacuum_tables); + } + + for (i = 0; i < numAsyncCons; i++) + { + PQfinish(connSlot[i].connection); + } + } + + /* + * A select loop that repeats calling select until a descriptor in the read + * set becomes readable. On Windows we have to check for the termination event + * from time to time, on Unix we can just block forever. + */ + static int + select_loop(int maxFd, fd_set *workerset) + { + int i; + fd_set saveSet = *workerset; + + #ifdef WIN32 + /* should always be the master */ + for (;;) + { + /* + * sleep a quarter of a second before checking if we should terminate. + */ + struct timeval tv = {0, 250000}; + + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, &tv); + + if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) + continue; + if (i) + break; + } + #else /* UNIX */ + + for (;;) + { + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, NULL); + if (i < 0 && errno == EINTR) + continue; + break; + } + #endif + + return i; + } + + /* + * DisconnectDatabase + * disconnect all the connections. + */ + void + DisconnectDatabase(ParallelSlot *slot) + { + PGcancel *cancel; + char errbuf[1]; + + if (!slot->connection) + return; + + if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE) + { + if ((cancel = PQgetCancel(slot->connection))) + { + PQcancel(cancel, errbuf, sizeof(errbuf)); + PQfreeCancel(cancel); + } + } + + PQfinish(slot->connection); + slot->connection= NULL; + } + + + + void prepare_command(PGconn *conn, bool full, bool verbose, bool and_analyze, + bool analyze_only, bool freeze, PQExpBuffer sql) + { + initPQExpBuffer(sql); + + if (analyze_only) + { + appendPQExpBuffer(sql, "ANALYZE"); + if (verbose) + appendPQExpBuffer(sql, " VERBOSE"); + } + else + { + appendPQExpBuffer(sql, "VACUUM"); + if (PQserverVersion(conn) >= 90000) + { + const char *paren = " ("; + const char *comma = ", "; + const char *sep = paren; + + if (full) + { + appendPQExpBuffer(sql, "%sFULL", sep); + sep = comma; + } + if (freeze) + { + appendPQExpBuffer(sql, "%sFREEZE", sep); + sep = comma; + } + if (verbose) + { + appendPQExpBuffer(sql, "%sVERBOSE", sep); + sep = comma; + } + if (and_analyze) + { + appendPQExpBuffer(sql, "%sANALYZE", sep); + sep = comma; + } + if (sep != paren) + appendPQExpBuffer(sql, ")"); + } + else + { + if (full) + appendPQExpBuffer(sql, " FULL"); + if (freeze) + appendPQExpBuffer(sql, " FREEZE"); + if (verbose) + appendPQExpBuffer(sql, " VERBOSE"); + if (and_analyze) + appendPQExpBuffer(sql, " ANALYZE"); + } + } + } + static void help(const char *progname) *************** *** 405,410 **** help(const char *progname) --- 927,933 ---- printf(_(" -V, --version output version information, then exit\n")); printf(_(" -z, --analyze update optimizer statistics\n")); printf(_(" -Z, --analyze-only only update optimizer statistics\n")); + printf(_(" -j, --jobs=NUM use this many asynchronous connections to vacuum\n")); printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n" " stages for faster results\n")); printf(_(" -?, --help show this help, then exit\n"));