Index: src/backend/postmaster/autovacuum.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v retrieving revision 1.40 diff -c -p -r1.40 autovacuum.c *** src/backend/postmaster/autovacuum.c 28 Mar 2007 22:17:12 -0000 1.40 --- src/backend/postmaster/autovacuum.c 10 Apr 2007 20:32:37 -0000 *************** *** 43,48 **** --- 43,49 ---- #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinval.h" + #include "storage/spin.h" #include "tcop/tcopprot.h" #include "utils/flatfiles.h" #include "utils/fmgroids.h" *************** *** 52,57 **** --- 53,59 ---- #include "utils/syscache.h" + static volatile sig_atomic_t got_SIGUSR1 = false; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t avlauncher_shutdown_request = false; *************** static volatile sig_atomic_t avlauncher_ *** 59,64 **** --- 61,67 ---- * GUC parameters */ bool autovacuum_start_daemon = false; + int autovacuum_max_workers; int autovacuum_naptime; int autovacuum_vac_thresh; double autovacuum_vac_scale; *************** int autovacuum_freeze_max_age; *** 69,75 **** int autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; ! /* Flag to tell if we are in the autovacuum daemon process */ static bool am_autovacuum_launcher = false; static bool am_autovacuum_worker = false; --- 72,78 ---- int autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; ! /* Flags to tell if we are in an autovacuum process */ static bool am_autovacuum_launcher = false; static bool am_autovacuum_worker = false; *************** static int default_freeze_min_age; *** 82,95 **** /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; ! /* struct to keep list of candidate databases for vacuum */ ! typedef struct autovac_dbase { ! Oid ad_datid; ! char *ad_name; ! TransactionId ad_frozenxid; ! PgStat_StatDBEntry *ad_entry; ! } autovac_dbase; /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */ typedef struct av_relation --- 85,106 ---- /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; ! /* struct to keep track of databases in launcher */ ! typedef struct avl_dbase { ! Oid adl_datid; /* hash key -- must be first */ ! TimestampTz adl_next_worker; ! int adl_score; ! } avl_dbase; ! ! /* struct to keep track of databases in worker */ ! typedef struct avw_dbase ! { ! Oid adw_datid; ! char *adw_name; ! TransactionId adw_frozenxid; ! PgStat_StatDBEntry *adw_entry; ! } avw_dbase; /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */ typedef struct av_relation *************** typedef struct autovac_table *** 110,123 **** int at_vacuum_cost_limit; } autovac_table; typedef struct { ! Oid process_db; /* OID of database to process */ ! int worker_pid; /* PID of the worker process, if any */ } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; #ifdef EXEC_BACKEND static pid_t avlauncher_forkexec(void); static pid_t avworker_forkexec(void); --- 121,189 ---- int at_vacuum_cost_limit; } autovac_table; + /*------------- + * This struct holds information about a single worker's whereabouts. We keep + * an array of these in shared memory, sized according to + * autovacuum_max_workers. + * + * wi_links entry into free list or running list + * wi_dboid OID of the database this worker is supposed to work on + * wi_tableoid OID of the table currently being vacuumed + * wi_workerpid PID of the running worker, 0 if not yet started + * wi_launchtime Time this worker was launched + * + * All fields are protected by AutovacuumLock, except for wi_tableoid which is + * protected by AutovacuumScheduleLock (which is read-only for everyone except + * that worker itself). + *------------- + */ + typedef struct WorkerInfoData + { + SHM_QUEUE wi_links; + Oid wi_dboid; + Oid wi_tableoid; + int wi_workerpid; + TimestampTz wi_launchtime; + } WorkerInfoData; + + typedef struct WorkerInfoData *WorkerInfo; + + /* the spinlock protecting the PGPROC array */ + NON_EXEC_STATIC slock_t *AutovacProcLock = NULL; + + /*------------- + * The main autovacuum shmem struct. On shared memory we store: 1) this main + * struct; 2) the array of WorkerInfo structs; 3) the array of PGPROCs. + * + * av_launcherpid the PID of the autovacuum launcher + * av_freeProcs the PGPROC freelist + * av_freeWorkers the WorkerInfo freelist + * av_runningWorkers the WorkerInfo non-free queue + * av_startingWorker pointer to WorkerInfo currently being started (cleared by + * the worker itself as soon as it's up and running) + * + * This struct is protected by AutovacuumLock, except for the PGPROC list which + * is protected by the AutovacProcLock spinlock. + *------------- + */ typedef struct { ! pid_t av_launcherpid; ! SHMEM_OFFSET av_freeProcs; ! SHMEM_OFFSET av_freeWorkers; ! SHM_QUEUE av_runningWorkers; ! SHMEM_OFFSET av_startingWorker; } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; + /* the database list in the launcher, and the context that contains it */ + static Dllist *DatabaseList = NULL; + static MemoryContext DatabaseListCxt = NULL; + + /* Pointer to my own WorkerInfo, valid on each worker */ + static WorkerInfo MyWorkerInfo = NULL; + #ifdef EXEC_BACKEND static pid_t avlauncher_forkexec(void); static pid_t avworker_forkexec(void); *************** static pid_t avworker_forkexec(void); *** 125,133 **** NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]); NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]); ! static void do_start_worker(void); static void do_autovacuum(void); ! static List *autovac_get_database_list(void); static void relation_check_autovac(Oid relid, Form_pg_class classForm, Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry, --- 191,205 ---- NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]); NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]); ! static Oid do_start_worker(void); ! static uint64 launcher_determine_sleep(bool canlaunch); ! static void launch_worker(TimestampTz now); ! static List *get_database_list(void); ! static void rebuild_database_list(Oid newdb); ! static int db_comparator(const void *a, const void *b); ! static void do_autovacuum(void); ! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid); static void relation_check_autovac(Oid relid, Form_pg_class classForm, Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry, *************** static void relation_needs_vacanalyze(Oi *** 141,152 **** static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum, bool doanalyze, int freeze_min_age); - static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid); static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *shared, PgStat_StatDBEntry *dbentry); static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid); static void avl_sighup_handler(SIGNAL_ARGS); static void avlauncher_shutdown(SIGNAL_ARGS); static void avl_quickdie(SIGNAL_ARGS); --- 213,224 ---- static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum, bool doanalyze, int freeze_min_age); static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *shared, PgStat_StatDBEntry *dbentry); static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid); static void avl_sighup_handler(SIGNAL_ARGS); + static void avl_sigusr1_handler(SIGNAL_ARGS); static void avlauncher_shutdown(SIGNAL_ARGS); static void avl_quickdie(SIGNAL_ARGS); *************** StartAutoVacLauncher(void) *** 230,241 **** /* * Main loop for the autovacuum launcher process. */ NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; - MemoryContext avlauncher_cxt; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; --- 302,333 ---- /* * Main loop for the autovacuum launcher process. + * + * The signalling between launcher and worker is as follows: + * + * When the worker has finished starting up, it stores its PID in wi_workerpid + * and sends a SIGUSR1 signal to the launcher. The launcher then knows that + * the postmaster is ready to start a new worker. We do it this way because + * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't + * yet processed the last one, in which case the second signal would be lost. + * This is only useful when two workers need to be started close to one + * another, which should be rare but it's possible. + * + * When the worker exits, its ProcKill routine (actually AutovacWorkerProcKill) + * is in charge of resetting the WorkerInfo entry and signalling the launcher. + * The launcher then wakes up and can launch a new worker if need be, or just + * go back to sleep. + * + * There is a potential problem if, for some reason, a worker starts and is not + * able to bootstrap itself correctly. To prevent this situation from starving + * the whole system, the launcher checks the launch time of the "starting + * worker". If it's too old (older than autovacuum_naptime seconds), it resets + * the worker entry and puts it back into the free list. */ NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; *************** AutoVacLauncherMain(int argc, char *argv *** 264,272 **** * Set up signal handlers. Since this is an auxiliary process, it has * particular signal requirements -- no deadlock checker or sinval * catchup, for example. - * - * XXX It may be a good idea to receive signals when an avworker process - * finishes. */ pqsignal(SIGHUP, avl_sighup_handler); --- 356,361 ---- *************** AutoVacLauncherMain(int argc, char *argv *** 276,282 **** pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); ! pqsignal(SIGUSR1, SIG_IGN); /* We don't listen for async notifies */ pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler); --- 365,371 ---- pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); ! pqsignal(SIGUSR1, avl_sigusr1_handler); /* We don't listen for async notifies */ pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler); *************** AutoVacLauncherMain(int argc, char *argv *** 300,311 **** * that we can reset the context during error recovery and thereby avoid * possible memory leaks. */ ! avlauncher_cxt = AllocSetContextCreate(TopMemoryContext, ! "Autovacuum Launcher", ! ALLOCSET_DEFAULT_MINSIZE, ! ALLOCSET_DEFAULT_INITSIZE, ! ALLOCSET_DEFAULT_MAXSIZE); ! MemoryContextSwitchTo(avlauncher_cxt); /* --- 389,400 ---- * that we can reset the context during error recovery and thereby avoid * possible memory leaks. */ ! AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, ! "Autovacuum Launcher", ! ALLOCSET_DEFAULT_MINSIZE, ! ALLOCSET_DEFAULT_INITSIZE, ! ALLOCSET_DEFAULT_MAXSIZE); ! MemoryContextSwitchTo(AutovacMemCxt); /* *************** AutoVacLauncherMain(int argc, char *argv *** 336,346 **** * Now return to normal top-level context and clear ErrorContext for * next time. */ ! MemoryContextSwitchTo(avlauncher_cxt); FlushErrorState(); /* Flush any leaked data in the top-level context */ ! MemoryContextResetAndDeleteChildren(avlauncher_cxt); /* Make sure pgstat also considers our stat data as gone */ pgstat_clear_snapshot(); --- 425,435 ---- * Now return to normal top-level context and clear ErrorContext for * next time. */ ! MemoryContextSwitchTo(AutovacMemCxt); FlushErrorState(); /* Flush any leaked data in the top-level context */ ! MemoryContextResetAndDeleteChildren(AutovacMemCxt); /* Make sure pgstat also considers our stat data as gone */ pgstat_clear_snapshot(); *************** AutoVacLauncherMain(int argc, char *argv *** 361,378 **** ereport(LOG, (errmsg("autovacuum launcher started"))); PG_SETMASK(&UnBlockSig); /* ! * take a nap before executing the first iteration, unless we were ! * requested an emergency run. */ ! if (autovacuum_start_daemon) ! pg_usleep(autovacuum_naptime * 1000000L); for (;;) { ! int worker_pid; /* * Emergency bailout if postmaster has died. This is to avoid the --- 450,481 ---- ereport(LOG, (errmsg("autovacuum launcher started"))); + /* must unblock signals before calling rebuild_database_list */ PG_SETMASK(&UnBlockSig); + /* in emergency mode, just start a worker and go away */ + if (!autovacuum_start_daemon) + { + do_start_worker(); + proc_exit(0); /* done */ + } + + AutoVacuumShmem->av_launcherpid = MyProcPid; + /* ! * Create the initial database list. The invariant we want this list to ! * keep is that it's ordered by decreasing next_time. As soon as an entry is updated to ! * a higher time, it will be moved to the front (which is correct because ! * the only operation is to add autovacuum_naptime to the entry, and time ! * always increases). */ ! rebuild_database_list(InvalidOid); for (;;) { ! uint64 micros; ! bool can_launch; ! TimestampTz current_time = 0; /* * Emergency bailout if postmaster has died. This is to avoid the *************** AutoVacLauncherMain(int argc, char *argv *** 381,386 **** --- 484,496 ---- if (!PostmasterIsAlive(true)) exit(1); + micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers != + INVALID_OFFSET); + + /* Sleep for a while according to schedule */ + pg_usleep(micros); + + /* the normal shutdown case */ if (avlauncher_shutdown_request) break; *************** AutoVacLauncherMain(int argc, char *argv *** 390,469 **** ProcessConfigFile(PGC_SIGHUP); } /* ! * if there's a worker already running, sleep until it ! * disappears. */ LWLockAcquire(AutovacuumLock, LW_SHARED); - worker_pid = AutoVacuumShmem->worker_pid; - LWLockRelease(AutovacuumLock); ! if (worker_pid != 0) { ! PGPROC *proc = BackendPidGetProc(worker_pid); ! if (proc != NULL && proc->isAutovacuum) ! goto sleep; ! else { /* ! * if the worker is not really running (or it's a process ! * that's not an autovacuum worker), remove the PID from shmem. ! * This should not happen, because either the worker exits ! * cleanly, in which case it'll remove the PID, or it dies, in ! * which case postmaster will cause a system reset cycle. */ ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! worker_pid = 0; ! LWLockRelease(AutovacuumLock); } } ! do_start_worker(); ! sleep: ! /* ! * in emergency mode, exit immediately so that the postmaster can ! * request another run right away if needed. ! * ! * XXX -- maybe it would be better to handle this inside the launcher ! * itself. ! */ ! if (!autovacuum_start_daemon) ! break; ! /* have pgstat read the file again next time */ ! pgstat_clear_snapshot(); ! /* now sleep until the next autovac iteration */ ! pg_usleep(autovacuum_naptime * 1000000L); } /* Normal exit from the autovac launcher is here */ ereport(LOG, (errmsg("autovacuum launcher shutting down"))); proc_exit(0); /* done */ } /* * do_start_worker * * Bare-bones procedure for starting an autovacuum worker from the launcher. * It determines what database to work on, sets up shared memory stuff and ! * signals postmaster to start the worker. */ ! static void do_start_worker(void) { List *dblist; ! bool for_xid_wrap; ! autovac_dbase *db; ! ListCell *cell; TransactionId xidForceLimit; /* Get a list of databases */ ! dblist = autovac_get_database_list(); /* * Determine the oldest datfrozenxid/relfrozenxid that we will allow --- 500,908 ---- ProcessConfigFile(PGC_SIGHUP); } + /* a worker started up or finished */ + if (got_SIGUSR1) + { + got_SIGUSR1 = false; + /* nothing else to do */ + } + /* ! * There are some conditions that we need to check before trying to ! * start a launcher. First, we need to make sure that there is a ! * launcher slot available. Second, we need to make sure that no other ! * worker is still starting up. */ LWLockAcquire(AutovacuumLock, LW_SHARED); ! can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET); ! ! if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET) { ! long secs; ! int usecs; ! WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); ! ! if (current_time == 0) ! current_time = GetCurrentTimestamp(); ! ! /* ! * We can't launch another worker when another one is still ! * starting up, so just sleep for a bit more; that worker will wake ! * us up again as soon as it's ready. We will only wait ! * autovacuum_naptime seconds for this to happen however. Note ! * that failure to connect to a particular database is not a ! * problem here, because the worker sets its PID on shared memory ! * before trying to connect; only low-level problems, like fork() ! * failure, can get us here. ! */ ! TimestampDifference(worker->wi_launchtime, current_time, ! &secs, &usecs); ! /* ignore microseconds, as they cannot make any difference */ ! if (secs > autovacuum_naptime) { + LWLockRelease(AutovacuumLock); + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); /* ! * No other process can put a worker in starting mode, so if ! * startingWorker is still INVALID after exchanging our lock, ! * we assume it's the same one we saw above (so we don't ! * recheck the launch time). */ ! if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET) ! { ! worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); ! worker->wi_dboid = InvalidOid; ! worker->wi_tableoid = InvalidOid; ! worker->wi_workerpid = 0; ! worker->wi_launchtime = 0; ! worker->wi_links.next = AutoVacuumShmem->av_freeWorkers; ! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker); ! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; ! } } + else + can_launch = false; } + LWLockRelease(AutovacuumLock); /* either shared or exclusive */ ! if (can_launch) ! { ! Dlelem *elem; ! elem = DLGetTail(DatabaseList); ! if (current_time == 0) ! current_time = GetCurrentTimestamp(); ! if (elem != NULL) ! { ! avl_dbase *avdb = DLE_VAL(elem); ! long secs; ! int usecs; ! ! TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs); ! ! /* do we have to start a worker? */ ! if (secs <= 0 && usecs <= 0) ! launch_worker(current_time); ! } ! else ! { ! /* ! * Special case when the list is empty: start a worker right ! * away. This covers the initial case, when no database is in ! * pgstats (thus the list is empty). ! */ ! launch_worker(current_time); ! } ! } } /* Normal exit from the autovac launcher is here */ ereport(LOG, (errmsg("autovacuum launcher shutting down"))); + AutoVacuumShmem->av_launcherpid = 0; proc_exit(0); /* done */ } + + /* + * Determine the time to sleep, in microseconds, based on the database list. + * + * The "canlaunch" parameter indicates whether we can start a worker right now, + * for example due to the workers being all busy. + */ + static uint64 + launcher_determine_sleep(bool canlaunch) + { + long secs; + int usecs; + Dlelem *elem; + + /* + * We sleep until the next scheduled vacuum. We trust that when the + * database list was built, care was taken so that no entries have times in + * the past; if the first entry has too close a next_worker value, or a + * time in the past, we will sleep a small nominal time. + */ + if (!canlaunch) + { + secs = autovacuum_naptime; + usecs = 0; + } + else if ((elem = DLGetTail(DatabaseList)) != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + TimestampTz current_time = GetCurrentTimestamp(); + TimestampTz next_wakeup; + + next_wakeup = avdb->adl_next_worker; + TimestampDifference(current_time, next_wakeup, &secs, &usecs); + } + else + { + /* list is empty, sleep for whole autovacuum_naptime seconds */ + secs = autovacuum_naptime; + usecs = 0; + } + + /* 100ms is the smallest time we'll allow the launcher to sleep */ + if (secs <= 0L && usecs <= 100000) + { + secs = 0L; + usecs = 100000; /* 100 ms */ + } + + return secs * 1000000 + usecs; + } + + /* + * Build an updated DatabaseList. It must only contain databases that appear + * in pgstats, and must be sorted by next_worker from highest to lowest, + * distributed regularly across the next autovacuum_naptime interval. + * + * Receives the Oid of the database that made this list be generated (we call + * this the "new" database, because when the database was already present on + * the list, we expect that this function is not called at all). The + * preexisting list, if any, will be used to preserve the order of the + * databases in the autovacuum_naptime period. The new database is put at the + * end of the interval. The actual values are not saved, which should not be + * much of a problem. + */ + static void + rebuild_database_list(Oid newdb) + { + List *dblist; + ListCell *cell; + MemoryContext newcxt; + MemoryContext oldcxt; + MemoryContext tmpcxt; + HASHCTL hctl; + int score; + int nelems; + HTAB *dbhash; + + /* use fresh stats */ + pgstat_clear_snapshot(); + + newcxt = AllocSetContextCreate(AutovacMemCxt, + "AV dblist", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + tmpcxt = AllocSetContextCreate(newcxt, + "tmp AV dblist", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(tmpcxt); + + /* + * Implementing this is not as simple as it sounds, because we need to put + * the new database at the end of the list; next the databases that were + * already on the list, and finally (at the tail of the list) all the other + * databases that are not on the existing list. + * + * To do this, we build an empty hash table of scored databases. We will + * start with the lowest score (zero) for the new database, then increasing + * scores for the databases in the existing list, in order, and lastly + * increasing scores for all databases gotten via get_database_list() that + * are not already on the hash. + * + * Then we will put all the hash elements into an array, sort the array by + * score, and finally put the array elements into the new doubly linked + * list. + */ + hctl.keysize = sizeof(Oid); + hctl.entrysize = sizeof(avl_dbase); + hctl.hash = oid_hash; + hctl.hcxt = tmpcxt; + dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */ + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* start by inserting the new database */ + score = 0; + if (OidIsValid(newdb)) + { + avl_dbase *db; + PgStat_StatDBEntry *entry; + + /* only consider this database if it has a pgstat entry */ + entry = pgstat_fetch_stat_dbentry(newdb); + if (entry != NULL) + { + /* we assume it isn't found because the hash was just created */ + db = hash_search(dbhash, &newdb, HASH_ENTER, NULL); + + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + + /* Now insert the databases from the existing list */ + if (DatabaseList != NULL) + { + Dlelem *elem; + + elem = DLGetHead(DatabaseList); + while (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + avl_dbase *db; + bool found; + PgStat_StatDBEntry *entry; + + elem = DLGetSucc(elem); + + /* + * skip databases with no stat entries -- in particular, this + * gets rid of dropped databases + */ + entry = pgstat_fetch_stat_dbentry(avdb->adl_datid); + if (entry == NULL) + continue; + + db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found); + + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + } + + /* finally, insert all qualifying databases not previously inserted */ + dblist = get_database_list(); + foreach (cell, dblist) + { + avw_dbase *avdb = lfirst(cell); + avl_dbase *db; + bool found; + PgStat_StatDBEntry *entry; + + /* only consider databases with a pgstat entry */ + entry = pgstat_fetch_stat_dbentry(avdb->adw_datid); + if (entry == NULL) + continue; + + db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found); + /* only update the score if the database was not already on the hash */ + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + nelems = score; + + /* from here on, the allocated memory belongs to the new list */ + MemoryContextSwitchTo(newcxt); + DatabaseList = DLNewList(); + + if (nelems > 0) + { + TimestampTz current_time; + int millis_increment; + avl_dbase *dbary; + avl_dbase *db; + HASH_SEQ_STATUS seq; + int i; + + /* put all the hash elements into an array */ + dbary = palloc(nelems * sizeof(avl_dbase)); + + i = 0; + hash_seq_init(&seq, dbhash); + while ((db = hash_seq_search(&seq)) != NULL) + memcpy(&(dbary[i++]), db, sizeof(avl_dbase)); + + /* sort the array */ + qsort(dbary, nelems, sizeof(avl_dbase), db_comparator); + + /* this is the time interval between databases in the schedule */ + millis_increment = 1000.0 * autovacuum_naptime / nelems; + current_time = GetCurrentTimestamp(); + + /* + * move the elements from the array into the dllist, setting the + * next_worker while walking the array + */ + for (i = 0; i < nelems; i++) + { + avl_dbase *db = &(dbary[i]); + Dlelem *elem; + + current_time = TimestampTzPlusMilliseconds(current_time, + millis_increment); + db->adl_next_worker = current_time; + + elem = DLNewElem(db); + /* later elements should go closer to the head of the list */ + DLAddHead(DatabaseList, elem); + } + } + + /* all done, clean up memory */ + if (DatabaseListCxt != NULL) + MemoryContextDelete(DatabaseListCxt); + MemoryContextDelete(tmpcxt); + DatabaseListCxt = newcxt; + MemoryContextSwitchTo(oldcxt); + } + + /* qsort comparator for avl_dbase, using adl_score */ + static int + db_comparator(const void *a, const void *b) + { + if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score) + return 0; + else + return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1; + } + /* * do_start_worker * * Bare-bones procedure for starting an autovacuum worker from the launcher. * It determines what database to work on, sets up shared memory stuff and ! * signals postmaster to start the worker. It fails gracefully if invoked when ! * autovacuum_workers are already active. ! * ! * Return value is the OID of the database that the worker is going to process, ! * or InvalidOid if no worker was actually started. */ ! static Oid do_start_worker(void) { List *dblist; ! ListCell *cell; TransactionId xidForceLimit; + bool for_xid_wrap; + avw_dbase *avdb; + TimestampTz current_time; + bool skipit = false; + + /* return quickly when there are no free workers */ + LWLockAcquire(AutovacuumLock, LW_SHARED); + if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET) + { + LWLockRelease(AutovacuumLock); + return InvalidOid; + } + LWLockRelease(AutovacuumLock); + + /* use fresh stats */ + pgstat_clear_snapshot(); /* Get a list of databases */ ! dblist = get_database_list(); /* * Determine the oldest datfrozenxid/relfrozenxid that we will allow *************** do_start_worker(void) *** 495,515 **** * isn't clear how to construct a metric that measures that and not cause * starvation for less busy databases. */ ! db = NULL; for_xid_wrap = false; foreach(cell, dblist) { ! autovac_dbase *tmp = lfirst(cell); /* Find pgstat entry if any */ ! tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid); /* Check to see if this one is at risk of wraparound */ ! if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit)) { ! if (db == NULL || ! TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid)) ! db = tmp; for_xid_wrap = true; continue; } --- 934,956 ---- * isn't clear how to construct a metric that measures that and not cause * starvation for less busy databases. */ ! avdb = NULL; for_xid_wrap = false; + current_time = GetCurrentTimestamp(); foreach(cell, dblist) { ! avw_dbase *tmp = lfirst(cell); ! Dlelem *elem; /* Find pgstat entry if any */ ! tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid); /* Check to see if this one is at risk of wraparound */ ! if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit)) { ! if (avdb == NULL || ! TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid)) ! avdb = tmp; for_xid_wrap = true; continue; } *************** do_start_worker(void) *** 520,545 **** * Otherwise, skip a database with no pgstat entry; it means it * hasn't seen any activity. */ ! if (!tmp->ad_entry) continue; /* * Remember the db with oldest autovac time. (If we are here, * both tmp->entry and db->entry must be non-null.) */ ! if (db == NULL || ! tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time) ! db = tmp; } /* Found a database -- process it */ ! if (db != NULL) { LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! AutoVacuumShmem->process_db = db->ad_datid; LWLockRelease(AutovacuumLock); SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); } } --- 961,1116 ---- * Otherwise, skip a database with no pgstat entry; it means it * hasn't seen any activity. */ ! if (!tmp->adw_entry) ! continue; ! ! /* ! * Also, skip a database that appears on the database list as having ! * been processed recently (less than autovacuum_naptime seconds ago). ! * We do this so that we don't select a database which we just ! * selected, but that pgstat hasn't gotten around to updating the last ! * autovacuum time yet. ! */ ! skipit = false; ! elem = DatabaseList ? DLGetTail(DatabaseList) : NULL; ! ! while (elem != NULL) ! { ! avl_dbase *dbp = DLE_VAL(elem); ! ! if (dbp->adl_datid == tmp->adw_datid) ! { ! TimestampTz curr_plus_naptime; ! TimestampTz next = dbp->adl_next_worker; ! ! curr_plus_naptime = ! TimestampTzPlusMilliseconds(current_time, ! autovacuum_naptime * 1000); ! ! /* ! * What we want here if to skip if next_worker falls between ! * the current time and the current time plus naptime. ! */ ! if (timestamp_cmp_internal(current_time, next) > 0) ! skipit = false; ! else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0) ! skipit = false; ! else ! skipit = true; ! ! break; ! } ! elem = DLGetPred(elem); ! } ! if (skipit) continue; /* * Remember the db with oldest autovac time. (If we are here, * both tmp->entry and db->entry must be non-null.) */ ! if (avdb == NULL || ! tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time) ! avdb = tmp; } /* Found a database -- process it */ ! if (avdb != NULL) { + WorkerInfo worker; + SHMEM_OFFSET sworker; + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! ! /* ! * Get a worker entry from the freelist. We checked above, so there ! * really should be a free slot -- complain very loudly if it isn't. ! */ ! sworker = AutoVacuumShmem->av_freeWorkers; ! if (sworker == INVALID_OFFSET) ! elog(FATAL, "no free worker found"); ! ! worker = (WorkerInfo) MAKE_PTR(sworker); ! AutoVacuumShmem->av_freeWorkers = worker->wi_links.next; ! ! worker->wi_dboid = avdb->adw_datid; ! worker->wi_workerpid = 0; ! worker->wi_launchtime = GetCurrentTimestamp(); ! ! AutoVacuumShmem->av_startingWorker = sworker; ! LWLockRelease(AutovacuumLock); SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + + return avdb->adw_datid; + } + else if (skipit) + { + /* + * If we skipped all databases on the list, rebuild it, because it + * probably contains a dropped database. + */ + rebuild_database_list(InvalidOid); + } + + return InvalidOid; + } + + /* + * launch_worker + * + * Wrapper for starting a worker from the launcher. Besides actually starting + * it, update the database list to reflect the next time that another one will + * need to be started on the selected database. The actual database choice is + * left to do_start_worker. + * + * This routine is also expected to insert an entry into the database list if + * the selected database was previously absent from the list. It returns the + * new database list. + */ + static void + launch_worker(TimestampTz now) + { + Oid dbid; + Dlelem *elem; + + dbid = do_start_worker(); + if (OidIsValid(dbid)) + { + /* + * Walk the database list and update the corresponding entry. If the + * database is not on the list, we'll recreate the list. + */ + elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList); + while (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + + if (avdb->adl_datid == dbid) + { + /* + * add autovacuum_naptime seconds to the current time, and use + * that as the new "next_worker" field for this database. + */ + avdb->adl_next_worker = + TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000); + + DLMoveToFront(elem); + break; + } + elem = DLGetSucc(elem); + } + + /* + * If the database was not present in the database list, we rebuild the + * list. It's possible that the database does not get into the list + * anyway, for example if it's a database that doesn't have a pgstat + * entry, but this is not a problem because we don't want to schedule + * workers regularly into those in any case. + */ + if (elem == NULL) + rebuild_database_list(dbid); } } *************** avl_sighup_handler(SIGNAL_ARGS) *** 550,555 **** --- 1121,1133 ---- got_SIGHUP = true; } + /* SIGUSR1: a worker is up and running, or just finished */ + static void + avl_sigusr1_handler(SIGNAL_ARGS) + { + got_SIGUSR1 = true; + } + static void avlauncher_shutdown(SIGNAL_ARGS) { *************** NON_EXEC_STATIC void *** 665,671 **** AutoVacWorkerMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; ! Oid dbid; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; --- 1243,1249 ---- AutoVacWorkerMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; ! Oid dbid = InvalidOid; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; *************** AutoVacWorkerMain(int argc, char *argv[] *** 744,751 **** /* * We can now go away. Note that because we called InitProcess, a ! * callback was registered to do ProcKill, which will clean up ! * necessary state. */ proc_exit(0); } --- 1322,1329 ---- /* * We can now go away. Note that because we called InitProcess, a ! * callback was registered to do AutovacWorkerProcKill, which will ! * clean up necessary state. */ proc_exit(0); } *************** AutoVacWorkerMain(int argc, char *argv[] *** 763,780 **** SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE); /* ! * Get the database Id we're going to work on, and announce our PID ! * in the shared memory area. We remove the database OID immediately ! * from the shared memory area. */ ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! dbid = AutoVacuumShmem->process_db; ! AutoVacuumShmem->process_db = InvalidOid; ! AutoVacuumShmem->worker_pid = MyProcPid; LWLockRelease(AutovacuumLock); if (OidIsValid(dbid)) { char *dbname; --- 1341,1374 ---- SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE); /* ! * Force statement_timeout to zero to avoid a timeout setting from ! * preventing regular maintenance from being executed. */ ! SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); ! /* ! * Get the info about the database we're going to work on. ! */ ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); ! dbid = MyWorkerInfo->wi_dboid; ! MyWorkerInfo->wi_workerpid = MyProcPid; ! ! /* insert into the running list */ ! SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, ! &MyWorkerInfo->wi_links); ! /* ! * remove from the "starting" pointer, so that the launcher can start a new ! * worker if required ! */ ! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; LWLockRelease(AutovacuumLock); + /* wake up the launcher */ + if (AutoVacuumShmem->av_launcherpid != 0) + kill(AutoVacuumShmem->av_launcherpid, SIGUSR1); + if (OidIsValid(dbid)) { char *dbname; *************** AutoVacWorkerMain(int argc, char *argv[] *** 803,809 **** /* Create the memory context where cross-transaction state is stored */ AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, ! "Autovacuum context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); --- 1397,1403 ---- /* Create the memory context where cross-transaction state is stored */ AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, ! "AV worker", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); *************** AutoVacWorkerMain(int argc, char *argv[] *** 813,838 **** do_autovacuum(); } ! /* ! * Now remove our PID from shared memory, so that the launcher can start ! * another worker as soon as appropriate. ! */ ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! AutoVacuumShmem->worker_pid = 0; ! LWLockRelease(AutovacuumLock); /* All done, go away */ proc_exit(0); } /* ! * autovac_get_database_list * * Return a list of all databases. Note we cannot use pg_database, * because we aren't connected; we use the flat database file. */ static List * ! autovac_get_database_list(void) { char *filename; List *dblist = NIL; --- 1407,1501 ---- do_autovacuum(); } ! /* AutovacWorkerProcKill cleans up and notifies the launcher. */ /* All done, go away */ proc_exit(0); } /* ! * Find and return an unused PGPROC entry to set up, for the new worker ! * process. ! */ ! PGPROC * ! AutoVacuumGetFreeProc() ! { ! PGPROC *proc; ! volatile SHMEM_OFFSET freeprocs; ! ! SpinLockAcquire(AutovacProcLock); ! ! freeprocs = AutoVacuumShmem->av_freeProcs; ! ! /* ! * This shouldn't happen, because we check the number of workers we launch ! */ ! if (freeprocs == INVALID_OFFSET) ! elog(FATAL, "too many autovacuum workers already"); ! ! /* get a free PGPROC slot from the freelist */ ! proc = (PGPROC *) MAKE_PTR(freeprocs); ! AutoVacuumShmem->av_freeProcs = proc->links.next; ! ! SpinLockRelease(AutovacProcLock); ! ! return proc; ! } ! ! /* ! * on_shmem_exit hook for returning the PGPROC struct into the freelist and ! * releasing any held LWLocks. ! */ ! void ! AutovacWorkerProcKill(int code, Datum arg) ! { ! volatile SHMEM_OFFSET freeprocs; ! ! Assert(MyProc != NULL); ! ! LWLockReleaseAll(); ! ! /* ! * Remove the WorkerInfo entry from the running list, and put it back into ! * the free list. ! */ ! if (MyWorkerInfo != NULL) ! { ! /* ! * XXX Acquiring a lwlock just after LWLockReleaseAll seems a bit ! * contradictory. Is it a problem? ! */ ! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); ! ! SHMQueueDelete(&MyWorkerInfo->wi_links); ! MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers; ! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo); ! MyWorkerInfo = NULL; ! ! LWLockRelease(AutovacuumLock); ! } ! ! /* Put my PGPROC back into the freelist */ ! SpinLockAcquire(AutovacProcLock); ! ! freeprocs = AutoVacuumShmem->av_freeProcs; ! MyProc->links.next = freeprocs; ! AutoVacuumShmem->av_freeProcs = MAKE_OFFSET(MyProc); ! ! SpinLockRelease(AutovacProcLock); ! ! /* It isn't mine anymore */ ! MyProc = NULL; ! } ! ! /* ! * get_database_list * * Return a list of all databases. Note we cannot use pg_database, * because we aren't connected; we use the flat database file. */ static List * ! get_database_list(void) { char *filename; List *dblist = NIL; *************** autovac_get_database_list(void) *** 852,866 **** while (read_pg_database_line(db_file, thisname, &db_id, &db_tablespace, &db_frozenxid)) { ! autovac_dbase *avdb; ! avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase)); ! avdb->ad_datid = db_id; ! avdb->ad_name = pstrdup(thisname); ! avdb->ad_frozenxid = db_frozenxid; /* this gets set later: */ ! avdb->ad_entry = NULL; dblist = lappend(dblist, avdb); } --- 1515,1529 ---- while (read_pg_database_line(db_file, thisname, &db_id, &db_tablespace, &db_frozenxid)) { ! avw_dbase *avdb; ! avdb = (avw_dbase *) palloc(sizeof(avw_dbase)); ! avdb->adw_datid = db_id; ! avdb->adw_name = pstrdup(thisname); ! avdb->adw_frozenxid = db_frozenxid; /* this gets set later: */ ! avdb->adw_entry = NULL; dblist = lappend(dblist, avdb); } *************** do_autovacuum(void) *** 1038,1047 **** --- 1701,1756 ---- Oid relid = lfirst_oid(cell); autovac_table *tab; char *relname; + WorkerInfo other_worker; + bool skipit; CHECK_FOR_INTERRUPTS(); /* + * hold schedule lock from here until we're sure that this table + * still needs vacuuming. We also need the AutovacuumLock to walk + * the worker array, but we'll let go of that one quickly. + */ + LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE); + LWLockAcquire(AutovacuumLock, LW_SHARED); + + /* + * Check whether the table is being vacuumed concurrently by another + * worker. + */ + skipit = false; + other_worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (other_worker) + { + /* ignore myself */ + if (other_worker == MyWorkerInfo) + goto next_worker; + + /* ignore workers in other databases */ + if (other_worker->wi_dboid != MyDatabaseId) + goto next_worker; + + if (other_worker->wi_tableoid == relid) + { + skipit = true; + break; + } + + next_worker: + other_worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &other_worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } + LWLockRelease(AutovacuumLock); + if (skipit) + { + LWLockRelease(AutovacuumScheduleLock); + continue; + } + + /* * Check whether pgstat data still says we need to vacuum this table. * It could have changed if something else processed the table while we * weren't looking. *************** do_autovacuum(void) *** 1053,1061 **** if (tab == NULL) { /* someone else vacuumed the table */ continue; } ! /* Ok, good to go! */ /* Set the vacuum cost parameters for this table */ VacuumCostDelay = tab->at_vacuum_cost_delay; --- 1762,1777 ---- if (tab == NULL) { /* someone else vacuumed the table */ + LWLockRelease(AutovacuumScheduleLock); continue; } ! ! /* ! * Ok, good to go. Store the table in shared memory before releasing ! * the lock so that other workers don't vacuum it concurrently. ! */ ! MyWorkerInfo->wi_tableoid = relid; ! LWLockRelease(AutovacuumScheduleLock); /* Set the vacuum cost parameters for this table */ VacuumCostDelay = tab->at_vacuum_cost_delay; *************** table_recheck_autovac(Oid relid) *** 1211,1217 **** PgStat_StatDBEntry *shared; PgStat_StatDBEntry *dbentry; ! /* We need fresh pgstat data for this */ pgstat_clear_snapshot(); shared = pgstat_fetch_stat_dbentry(InvalidOid); --- 1927,1933 ---- PgStat_StatDBEntry *shared; PgStat_StatDBEntry *dbentry; ! /* use fresh stats */ pgstat_clear_snapshot(); shared = pgstat_fetch_stat_dbentry(InvalidOid); *************** table_recheck_autovac(Oid relid) *** 1219,1226 **** /* fetch the relation's relcache entry */ classTup = SearchSysCacheCopy(RELOID, ! ObjectIdGetDatum(relid), ! 0, 0, 0); if (!HeapTupleIsValid(classTup)) return NULL; classForm = (Form_pg_class) GETSTRUCT(classTup); --- 1935,1942 ---- /* fetch the relation's relcache entry */ classTup = SearchSysCacheCopy(RELOID, ! ObjectIdGetDatum(relid), ! 0, 0, 0); if (!HeapTupleIsValid(classTup)) return NULL; classForm = (Form_pg_class) GETSTRUCT(classTup); *************** IsAutoVacuumWorkerProcess(void) *** 1630,1636 **** Size AutoVacuumShmemSize(void) { ! return sizeof(AutoVacuumShmemStruct); } /* --- 2346,2366 ---- Size AutoVacuumShmemSize(void) { ! Size size; ! ! /* ! * Need the fixed struct, the array of WorkerInfoData, and the array ! * of PGPROCs ! */ ! size = sizeof(AutoVacuumShmemStruct); ! size = MAXALIGN(size); ! size = add_size(size, mul_size(autovacuum_max_workers, ! sizeof(WorkerInfoData))); ! size = MAXALIGN(size); ! size = add_size(size, mul_size(autovacuum_max_workers, ! sizeof(PGPROC))); ! ! return size; } /* *************** AutoVacuumShmemInit(void) *** 1650,1657 **** ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough shared memory for autovacuum"))); - if (found) - return; /* already initialized */ ! MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct)); } --- 2380,2432 ---- ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough shared memory for autovacuum"))); ! if (!IsUnderPostmaster) ! { ! WorkerInfo worker; ! PGPROC *proc; ! int i; ! ! Assert(!found); ! ! /* initialize our spinlock */ ! AutovacProcLock = (slock_t *) ShmemAlloc(sizeof(slock_t)); ! SpinLockInit(AutovacProcLock); ! ! AutoVacuumShmem->av_launcherpid = 0; ! AutoVacuumShmem->av_freeProcs = INVALID_OFFSET; ! AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET; ! SHMQueueInit(&AutoVacuumShmem->av_runningWorkers); ! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; ! ! worker = (WorkerInfo) ((char *) AutoVacuumShmem + ! MAXALIGN(sizeof(AutoVacuumShmemStruct))); ! ! /* initialize the WorkerInfo free list */ ! for (i = 0; i < autovacuum_max_workers; i++) ! { ! worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers; ! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]); ! } ! ! proc = (PGPROC *) ! ((char *) worker + ! MAXALIGN(sizeof(WorkerInfoData) * autovacuum_max_workers)); ! ! /* initialize the PGPROC free list and semaphores */ ! for (i = 0; i < autovacuum_max_workers; i++) ! { ! proc[i].links.next = AutoVacuumShmem->av_freeProcs; ! PGSemaphoreCreate(&proc[i].sem); ! AutoVacuumShmem->av_freeProcs = MAKE_OFFSET(&proc[i]); ! } ! } ! else ! Assert(found); ! } ! ! int ! AutoVacuumSemas(void) ! { ! return autovacuum_max_workers; } Index: src/backend/postmaster/postmaster.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/postmaster.c,v retrieving revision 1.527 diff -c -p -r1.527 postmaster.c *** src/backend/postmaster/postmaster.c 22 Mar 2007 19:53:30 -0000 1.527 --- src/backend/postmaster/postmaster.c 10 Apr 2007 20:31:40 -0000 *************** typedef struct *** 327,332 **** --- 327,333 ---- Backend *ShmemBackendArray; LWLock *LWLockArray; slock_t *ProcStructLock; + slock_t *AutovacProcLock; PROC_HDR *ProcGlobal; PGPROC *AuxiliaryProcs; InheritableSocket pgStatSock; *************** CreateOptsFile(int argc, char *argv[], c *** 3899,3904 **** --- 3900,3906 ---- extern slock_t *ShmemLock; extern LWLock *LWLockArray; extern slock_t *ProcStructLock; + extern slock_t *AutovacProcLock; extern PROC_HDR *ProcGlobal; extern PGPROC *AuxiliaryProcs; extern int pgStatSock; *************** save_backend_variables(BackendParameters *** 3942,3947 **** --- 3944,3950 ---- param->LWLockArray = LWLockArray; param->ProcStructLock = ProcStructLock; + param->AutovacProcLock = AutovacProcLock; param->ProcGlobal = ProcGlobal; param->AuxiliaryProcs = AuxiliaryProcs; write_inheritable_socket(¶m->pgStatSock, pgStatSock, childPid); *************** restore_backend_variables(BackendParamet *** 4145,4150 **** --- 4148,4154 ---- LWLockArray = param->LWLockArray; ProcStructLock = param->ProcStructLock; + AutovacProcLock = param->AutovacProcLock; ProcGlobal = param->ProcGlobal; AuxiliaryProcs = param->AuxiliaryProcs; read_inheritable_socket(&pgStatSock, ¶m->pgStatSock); Index: src/backend/storage/ipc/ipci.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/ipci.c,v retrieving revision 1.91 diff -c -p -r1.91 ipci.c *** src/backend/storage/ipc/ipci.c 15 Feb 2007 23:23:23 -0000 1.91 --- src/backend/storage/ipc/ipci.c 9 Apr 2007 16:07:08 -0000 *************** CreateSharedMemoryAndSemaphores(bool mak *** 138,143 **** --- 138,144 ---- */ numSemas = ProcGlobalSemas(); numSemas += SpinlockSemas(); + numSemas += AutoVacuumSemas(); PGReserveSemaphores(numSemas, port); } else Index: src/backend/storage/ipc/procarray.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v retrieving revision 1.24 diff -c -p -r1.24 procarray.c *** src/backend/storage/ipc/procarray.c 3 Apr 2007 16:34:36 -0000 1.24 --- src/backend/storage/ipc/procarray.c 10 Apr 2007 20:31:38 -0000 *************** *** 36,41 **** --- 36,42 ---- #include "access/xact.h" #include "access/twophase.h" #include "miscadmin.h" + #include "postmaster/autovacuum.h" #include "storage/procarray.h" #include "utils/tqual.h" *************** ProcArrayShmemSize(void) *** 89,95 **** size = offsetof(ProcArrayStruct, procs); size = add_size(size, mul_size(sizeof(PGPROC *), ! add_size(MaxBackends, max_prepared_xacts))); return size; } --- 90,96 ---- size = offsetof(ProcArrayStruct, procs); size = add_size(size, mul_size(sizeof(PGPROC *), ! add_size(MaxBackends, max_prepared_xacts))); return size; } Index: src/backend/storage/ipc/sinvaladt.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/sinvaladt.c,v retrieving revision 1.63 diff -c -p -r1.63 sinvaladt.c *** src/backend/storage/ipc/sinvaladt.c 5 Jan 2007 22:19:38 -0000 1.63 --- src/backend/storage/ipc/sinvaladt.c 10 Apr 2007 18:34:16 -0000 *************** *** 15,20 **** --- 15,21 ---- #include "postgres.h" #include "miscadmin.h" + #include "postmaster/autovacuum.h" #include "storage/backendid.h" #include "storage/ipc.h" #include "storage/lwlock.h" *************** SInvalShmemSize(void) *** 39,44 **** --- 40,46 ---- size = offsetof(SISeg, procState); size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); + size = add_size(size, mul_size(sizeof(ProcState), autovacuum_max_workers)); return size; } *************** SIBufferInit(void) *** 64,71 **** segP->minMsgNum = 0; segP->maxMsgNum = 0; segP->lastBackend = 0; ! segP->maxBackends = MaxBackends; ! segP->freeBackends = MaxBackends; /* The buffer[] array is initially all unused, so we need not fill it */ --- 66,73 ---- segP->minMsgNum = 0; segP->maxMsgNum = 0; segP->lastBackend = 0; ! segP->maxBackends = MaxBackends + autovacuum_max_workers; ! segP->freeBackends = MaxBackends + autovacuum_max_workers; /* The buffer[] array is initially all unused, so we need not fill it */ Index: src/backend/storage/lmgr/lock.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/lock.c,v retrieving revision 1.176 diff -c -p -r1.176 lock.c *** src/backend/storage/lmgr/lock.c 1 Feb 2007 19:10:28 -0000 1.176 --- src/backend/storage/lmgr/lock.c 10 Apr 2007 20:32:23 -0000 *************** *** 37,42 **** --- 37,43 ---- #include "access/twophase_rmgr.h" #include "miscadmin.h" #include "pgstat.h" + #include "postmaster/autovacuum.h" #include "storage/lmgr.h" #include "utils/memutils.h" #include "utils/ps_status.h" *************** *** 47,53 **** int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS() \ ! mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) /* --- 48,55 ---- int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS() \ ! mul_size(max_locks_per_xact, \ ! add_size(MaxBackends, max_prepared_xacts)) /* Index: src/backend/storage/lmgr/proc.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/proc.c,v retrieving revision 1.187 diff -c -p -r1.187 proc.c *** src/backend/storage/lmgr/proc.c 3 Apr 2007 16:34:36 -0000 1.187 --- src/backend/storage/lmgr/proc.c 10 Apr 2007 20:50:14 -0000 *************** ProcGlobalShmemSize(void) *** 96,103 **** size = add_size(size, sizeof(PROC_HDR)); /* AuxiliaryProcs */ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC))); ! /* MyProcs */ ! size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC))); /* ProcStructLock */ size = add_size(size, sizeof(slock_t)); --- 96,106 ---- size = add_size(size, sizeof(PROC_HDR)); /* AuxiliaryProcs */ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC))); ! /* ! * MyProcs. Note we use MaxConnections here instead of MaxBackends, ! * because the autovacuum PGPROC structs are reserved by autovacuum itself. ! */ ! size = add_size(size, mul_size(MaxConnections, sizeof(PGPROC))); /* ProcStructLock */ size = add_size(size, sizeof(slock_t)); *************** ProcGlobalShmemSize(void) *** 110,117 **** int ProcGlobalSemas(void) { ! /* We need a sema per backend, plus one for each auxiliary process. */ ! return MaxBackends + NUM_AUXILIARY_PROCS; } /* --- 113,123 ---- int ProcGlobalSemas(void) { ! /* ! * We need a sema per backend (but don't count autovacuum), plus one for ! * each auxiliary process. ! */ ! return MaxConnections + NUM_AUXILIARY_PROCS; } /* *************** ProcGlobalSemas(void) *** 127,133 **** * running out when trying to start another backend is a common failure. * So, now we grab enough semaphores to support the desired max number * of backends immediately at initialization --- if the sysadmin has set ! * MaxBackends higher than his kernel will support, he'll find out sooner * rather than later. * * Another reason for creating semaphores here is that the semaphore --- 133,139 ---- * running out when trying to start another backend is a common failure. * So, now we grab enough semaphores to support the desired max number * of backends immediately at initialization --- if the sysadmin has set ! * MaxConnections higher than his kernel will support, he'll find out sooner * rather than later. * * Another reason for creating semaphores here is that the semaphore *************** InitProcGlobal(void) *** 169,181 **** /* * Pre-create the PGPROC structures and create a semaphore for each. */ ! procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC)); if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); ! MemSet(procs, 0, MaxBackends * sizeof(PGPROC)); ! for (i = 0; i < MaxBackends; i++) { PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = ProcGlobal->freeProcs; --- 175,187 ---- /* * Pre-create the PGPROC structures and create a semaphore for each. */ ! procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC)); if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); ! MemSet(procs, 0, MaxConnections * sizeof(PGPROC)); ! for (i = 0; i < MaxConnections; i++) { PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = ProcGlobal->freeProcs; *************** InitProcGlobal(void) *** 200,250 **** void InitProcess(void) { - /* use volatile pointer to prevent code rearrangement */ - volatile PROC_HDR *procglobal = ProcGlobal; - SHMEM_OFFSET myOffset; int i; ! /* ! * ProcGlobal should be set up already (if we are a backend, we inherit ! * this by fork() or EXEC_BACKEND mechanism from the postmaster). ! */ ! if (procglobal == NULL) ! elog(PANIC, "proc header uninitialized"); ! ! if (MyProc != NULL) ! elog(ERROR, "you already exist"); ! ! /* ! * Try to get a proc struct from the free list. If this fails, we must be ! * out of PGPROC structures (not to mention semaphores). ! * ! * While we are holding the ProcStructLock, also copy the current shared ! * estimate of spins_per_delay to local storage. ! */ ! SpinLockAcquire(ProcStructLock); ! set_spins_per_delay(procglobal->spins_per_delay); ! myOffset = procglobal->freeProcs; - if (myOffset != INVALID_OFFSET) - { - MyProc = (PGPROC *) MAKE_PTR(myOffset); - procglobal->freeProcs = MyProc->links.next; - SpinLockRelease(ProcStructLock); - } - else - { /* ! * If we reach here, all the PGPROCs are in use. This is one of the ! * possible places to detect "too many backends", so give the standard ! * error message. */ ! SpinLockRelease(ProcStructLock); ! ereport(FATAL, ! (errcode(ERRCODE_TOO_MANY_CONNECTIONS), ! errmsg("sorry, too many clients already"))); } /* --- 206,262 ---- void InitProcess(void) { int i; ! if (IsAutoVacuumWorkerProcess()) ! MyProc = AutoVacuumGetFreeProc(); ! else ! { ! /* use volatile pointer to prevent code rearrangement */ ! volatile PROC_HDR *procglobal = ProcGlobal; ! SHMEM_OFFSET myOffset; ! /* ! * ProcGlobal should be set up already (if we are a backend, we inherit ! * this by fork() or EXEC_BACKEND mechanism from the postmaster). ! */ ! if (procglobal == NULL) ! elog(PANIC, "proc header uninitialized"); ! if (MyProc != NULL) ! elog(ERROR, "you already exist"); /* ! * Try to get a proc struct from the free list. If this fails, we must be ! * out of PGPROC structures (not to mention semaphores). ! * ! * While we are holding the ProcStructLock, also copy the current shared ! * estimate of spins_per_delay to local storage. */ ! SpinLockAcquire(ProcStructLock); ! ! set_spins_per_delay(procglobal->spins_per_delay); ! ! myOffset = procglobal->freeProcs; ! ! if (myOffset != INVALID_OFFSET) ! { ! MyProc = (PGPROC *) MAKE_PTR(myOffset); ! procglobal->freeProcs = MyProc->links.next; ! SpinLockRelease(ProcStructLock); ! } ! else ! { ! /* ! * If we reach here, all the PGPROCs are in use. This is one of the ! * possible places to detect "too many backends", so give the standard ! * error message. ! */ ! SpinLockRelease(ProcStructLock); ! ereport(FATAL, ! (errcode(ERRCODE_TOO_MANY_CONNECTIONS), ! errmsg("sorry, too many clients already"))); ! } } /* *************** InitProcess(void) *** 280,286 **** /* * Arrange to clean up at backend exit. */ ! on_shmem_exit(ProcKill, 0); /* * Now that we have a PGPROC, we could try to acquire locks, so initialize --- 292,301 ---- /* * Arrange to clean up at backend exit. */ ! if (IsAutoVacuumWorkerProcess()) ! on_shmem_exit(AutovacWorkerProcKill, 0); ! else ! on_shmem_exit(ProcKill, 0); /* * Now that we have a PGPROC, we could try to acquire locks, so initialize Index: src/backend/utils/init/globals.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/init/globals.c,v retrieving revision 1.100 diff -c -p -r1.100 globals.c *** src/backend/utils/init/globals.c 5 Jan 2007 22:19:44 -0000 1.100 --- src/backend/utils/init/globals.c 10 Apr 2007 21:00:28 -0000 *************** bool allowSystemTableMods = false; *** 95,103 **** int work_mem = 1024; int maintenance_work_mem = 16384; ! /* Primary determinants of sizes of shared-memory structures: */ int NBuffers = 1000; int MaxBackends = 100; int VacuumCostPageHit = 1; /* GUC parameters for vacuum */ int VacuumCostPageMiss = 10; --- 95,108 ---- int work_mem = 1024; int maintenance_work_mem = 16384; ! /* ! * Primary determinants of sizes of shared-memory structures. MaxBackends is ! * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign ! * hook): ! */ int NBuffers = 1000; int MaxBackends = 100; + int MaxConnections = 90; int VacuumCostPageHit = 1; /* GUC parameters for vacuum */ int VacuumCostPageMiss = 10; Index: src/backend/utils/misc/guc.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v retrieving revision 1.383 diff -c -p -r1.383 guc.c *** src/backend/utils/misc/guc.c 19 Mar 2007 23:38:30 -0000 1.383 --- src/backend/utils/misc/guc.c 10 Apr 2007 20:51:14 -0000 *************** static bool assign_tcp_keepalives_count( *** 161,166 **** --- 161,168 ---- static const char *show_tcp_keepalives_idle(void); static const char *show_tcp_keepalives_interval(void); static const char *show_tcp_keepalives_count(void); + static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source); + static bool assign_maxconnections(int newval, bool doit, GucSource source); /* * GUC option variables that are exported from this module *************** static struct config_int ConfigureNamesI *** 1147,1162 **** * number. * * MaxBackends is limited to INT_MAX/4 because some places compute ! * 4*MaxBackends without any overflow check. Likewise we have to limit ! * NBuffers to INT_MAX/2. */ { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL }, ! &MaxBackends, ! 100, 1, INT_MAX / 4, NULL, NULL }, { --- 1149,1167 ---- * number. * * MaxBackends is limited to INT_MAX/4 because some places compute ! * 4*MaxBackends without any overflow check. This check is made on the ! * assign_maxconnections, since MaxBackends is computed as MaxConnections + ! * autovacuum_max_workers. ! * ! * Likewise we have to limit NBuffers to INT_MAX/2. */ { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL }, ! &MaxConnections, ! 100, 1, INT_MAX / 4, assign_maxconnections, NULL }, { *************** static struct config_int ConfigureNamesI *** 1620,1625 **** --- 1625,1639 ---- &autovacuum_freeze_max_age, 200000000, 100000000, 2000000000, NULL, NULL }, + { + /* see max_connections */ + {"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM, + gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."), + NULL + }, + &autovacuum_max_workers, + 10, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL + }, { {"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER, *************** show_tcp_keepalives_count(void) *** 6659,6663 **** --- 6673,6704 ---- return nbuf; } + static bool + assign_maxconnections(int newval, bool doit, GucSource source) + { + if (doit) + { + if (newval + autovacuum_max_workers > INT_MAX / 4) + return false; + + MaxBackends = newval + autovacuum_max_workers; + } + + return true; + } + + static bool + assign_autovacuum_max_workers(int newval, bool doit, GucSource source) + { + if (doit) + { + if (newval + MaxConnections > INT_MAX / 4) + return false; + + MaxBackends = newval + MaxConnections; + } + + return true; + } #include "guc-file.c" Index: src/include/miscadmin.h =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/include/miscadmin.h,v retrieving revision 1.193 diff -c -p -r1.193 miscadmin.h *** src/include/miscadmin.h 1 Mar 2007 14:52:04 -0000 1.193 --- src/include/miscadmin.h 10 Apr 2007 20:51:06 -0000 *************** extern DLLIMPORT char *DataDir; *** 129,134 **** --- 129,135 ---- extern DLLIMPORT int NBuffers; extern int MaxBackends; + extern int MaxConnections; extern DLLIMPORT int MyProcPid; extern DLLIMPORT struct Port *MyProcPort; Index: src/include/postmaster/autovacuum.h =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v retrieving revision 1.8 diff -c -p -r1.8 autovacuum.h *** src/include/postmaster/autovacuum.h 15 Feb 2007 23:23:23 -0000 1.8 --- src/include/postmaster/autovacuum.h 10 Apr 2007 14:46:02 -0000 *************** *** 14,21 **** --- 14,24 ---- #ifndef AUTOVACUUM_H #define AUTOVACUUM_H + #include "storage/lock.h" + /* GUC variables */ extern bool autovacuum_start_daemon; + extern int autovacuum_max_workers; extern int autovacuum_naptime; extern int autovacuum_vac_thresh; extern double autovacuum_vac_scale; *************** extern bool IsAutoVacuumWorkerProcess(vo *** 34,39 **** --- 37,45 ---- extern void autovac_init(void); extern int StartAutoVacLauncher(void); extern int StartAutoVacWorker(void); + extern PGPROC *AutoVacuumGetFreeProc(void); + extern void AutovacWorkerProcKill(int code, Datum arg); + extern int AutoVacuumSemas(void); #ifdef EXEC_BACKEND extern void AutoVacLauncherMain(int argc, char *argv[]); Index: src/include/storage/lwlock.h =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v retrieving revision 1.35 diff -c -p -r1.35 lwlock.h *** src/include/storage/lwlock.h 3 Apr 2007 16:34:36 -0000 1.35 --- src/include/storage/lwlock.h 6 Apr 2007 02:20:17 -0000 *************** typedef enum LWLockId *** 61,66 **** --- 61,67 ---- BtreeVacuumLock, AddinShmemInitLock, AutovacuumLock, + AutovacuumScheduleLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,