diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0591f3f..b2c5cb0 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -36,6 +36,7 @@ #include "libpq/be-fsstubs.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/bgworker.h" #include "replication/walsender.h" #include "replication/syncrep.h" #include "storage/fd.h" @@ -1978,6 +1979,7 @@ CommitTransaction(void) AtEOXact_HashTables(true); AtEOXact_PgStat(true); AtEOXact_Snapshot(true); + AtEOXact_BackgroundWorker(); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2233,6 +2235,7 @@ PrepareTransaction(void) AtEOXact_HashTables(true); /* don't call AtEOXact_PgStat here */ AtEOXact_Snapshot(true); + AtEOXact_BackgroundWorker(); CurrentResourceOwner = NULL; ResourceOwnerDelete(TopTransactionResourceOwner); @@ -2378,6 +2381,7 @@ AbortTransaction(void) AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_BackgroundWorker(); pgstat_report_xact_timestamp(0); } @@ -4150,6 +4154,7 @@ CommitSubTransaction(void) AtEOSubXact_HashTables(true, s->nestingLevel); AtEOSubXact_PgStat(true, s->nestingLevel); AtSubCommit_Snapshot(s->nestingLevel); + AtEOSubXact_BackgroundWorker(s->nestingLevel); /* * We need to restore the upper transaction's read-only state, in case the @@ -4267,6 +4272,7 @@ AbortSubTransaction(void) AtEOSubXact_HashTables(false, s->nestingLevel); AtEOSubXact_PgStat(false, s->nestingLevel); AtSubAbort_Snapshot(s->nestingLevel); + AtEOSubXact_BackgroundWorker(s->nestingLevel); } /* diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 01ab3a8..03b4925 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -15,6 +15,7 @@ #include #include +#include "access/xact.h" #include "miscadmin.h" #include "libpq/pqsignal.h" #include "postmaster/bgworker_internals.h" @@ -29,7 +30,9 @@ #include "storage/shmem.h" #include "tcop/tcopprot.h" #include "utils/ascii.h" +#include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/resowner_private.h" #include "utils/timeout.h" /* @@ -90,7 +93,21 @@ struct BackgroundWorkerHandle uint64 generation; }; -BackgroundWorkerArray *BackgroundWorkerData; +typedef struct BackgroundWorkerResource +{ + SubTransactionId subid; + BackgroundWorkerHandle handle; + int flags; +} BackgroundWorkerResource; + +/* Background worker slots in shared memory. */ +static BackgroundWorkerArray *BackgroundWorkerData; + +/* Background worker tracking in backend-private memory. */ +static unsigned BackgroundWorkerStackAllocated; +static unsigned BackgroundWorkerStackUsed; +static BackgroundWorkerResource *BackgroundWorkerStack; +unsigned NumberOfPreciousBackgroundWorkers; /* * Calculate shared memory needed. @@ -452,6 +469,16 @@ SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel) return false; } + if ((worker->bgw_flags & BGWORKER_PRECIOUS) != 0 && + (worker->bgw_notify_pid != MyProcPid || MyProcPid == 0)) + { + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": cannot be marked precious unless registrant PID is notified", + worker->bgw_name))); + return false; + } + return true; } @@ -701,6 +728,15 @@ RegisterBackgroundWorker(BackgroundWorker *worker) return; } + if ((worker->bgw_flags & BGWORKER_EPHEMERAL) != 0) + { + ereport(LOG, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("background worker \"%s\": only dynamic background workers can be marked ephemeral", + worker->bgw_name))); + return; + } + /* * Enforce maximum number of workers. Note this is overly restrictive: we * could allow more non-shmem-connected workers, because these don't count @@ -774,11 +810,39 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker, if (!SanityCheckBackgroundWorker(worker, ERROR)) return false; - LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE); + /* + * If this worker is marked ephemeral or precious, we'll need to push it + * onto our stack of background workers whose fate we must track. Make + * sure the stack is big enough before we register the worker, so that + * adding the actual stack entry is a no-fail operation. + */ + if ((worker->bgw_flags & (BGWORKER_EPHEMERAL | BGWORKER_PRECIOUS)) != 0 + && BackgroundWorkerStackUsed >= BackgroundWorkerStackAllocated) + { + unsigned newsize; + + if (BackgroundWorkerStackAllocated != 0) + { + newsize = BackgroundWorkerStackAllocated * 2; + BackgroundWorkerStack = + repalloc(BackgroundWorkerStack, + newsize * sizeof(BackgroundWorkerResource)); + } + else + { + newsize = 8; + BackgroundWorkerStack = + MemoryContextAlloc(TopTransactionContext, + newsize * sizeof(BackgroundWorkerResource)); + } + + BackgroundWorkerStackAllocated = newsize; + } /* * Look for an unused slot. If we find one, grab it. */ + LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE); for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno) { BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno]; @@ -802,21 +866,41 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker, break; } } - LWLockRelease(BackgroundWorkerLock); - /* If we found a slot, tell the postmaster to notice the change. */ if (success) - SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE); - - /* - * If we found a slot and the user has provided a handle, initialize it. - */ - if (success && handle) { - *handle = palloc(sizeof(BackgroundWorkerHandle)); - (*handle)->slot = slotno; - (*handle)->generation = generation; + /* + * If this background worker requires a stack entry, we must create + * that before doing anything else. Otherwise, a failure before we + * exit this function would fail to terminate an ephemeral background + * worker we just registered. + */ + if ((worker->bgw_flags & (BGWORKER_EPHEMERAL | BGWORKER_PRECIOUS)) != 0) + { + BackgroundWorkerResource *bgres; + + bgres = &BackgroundWorkerStack[BackgroundWorkerStackUsed]; + bgres->subid = GetCurrentSubTransactionId(); + bgres->handle.slot = slotno; + bgres->handle.generation = generation; + bgres->flags = worker->bgw_flags; + ++BackgroundWorkerStackUsed; + + if ((bgres->flags & BGWORKER_PRECIOUS) != 0) + ++NumberOfPreciousBackgroundWorkers; + } + + /* If the caller wants a handle, provide one. */ + if (handle) + { + *handle = palloc(sizeof(BackgroundWorkerHandle)); + (*handle)->slot = slotno; + (*handle)->generation = generation; + } + + /* Tell the postmaster to notice the change. */ + SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE); } return success; @@ -930,6 +1014,38 @@ WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp) } /* + * Remove a background worker from the stack. + */ +static void +RemoveBackgroundWorkerFromStack(BackgroundWorkerHandle *handle) +{ + unsigned n; + + /* Search for and release any matching stack entry. */ + for (n = BackgroundWorkerStackUsed; n > 0; --n) + { + BackgroundWorkerResource *bgres; + + bgres = &BackgroundWorkerStack[n - 1]; + + if (memcmp(&bgres->handle, handle, sizeof(BackgroundWorkerHandle)) == 0) + { + if ((bgres->flags & BGWORKER_PRECIOUS) != 0) + { + Assert(NumberOfPreciousBackgroundWorkers > 0); + --NumberOfPreciousBackgroundWorkers; + } + if (n < BackgroundWorkerStackUsed) + memmove(&BackgroundWorkerStack[n - 1], + &BackgroundWorkerStack[n], + (BackgroundWorkerStackUsed - n) * + sizeof(BackgroundWorkerHandle)); + --BackgroundWorkerStackUsed; + } + } +} + +/* * Instruct the postmaster to terminate a background worker. * * Note that it's safe to do this without regard to whether the worker is @@ -954,7 +1070,111 @@ TerminateBackgroundWorker(BackgroundWorkerHandle *handle) } LWLockRelease(BackgroundWorkerLock); + /* Cease backend-local bookkeeping for terminated worker. */ + RemoveBackgroundWorkerFromStack(handle); + /* Make sure the postmaster notices the change to shared memory. */ if (signal_postmaster) SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE); } + +/* + * Raise an error if a precious background worker pertaining to the current + * transaction has exited unexpectedly. + */ +extern void +CheckForPreciousBackgroundWorkerDeath(void) +{ + SubTransactionId subid = GetCurrentSubTransactionId(); + unsigned n; + + for (n = BackgroundWorkerStackUsed; n > 0; --n) + { + BackgroundWorkerResource *bgres; + pid_t pid; + + bgres = &BackgroundWorkerStack[n - 1]; + + /* + * Once we encounter a SubTransactionId that does not match our own, + * we know that there will be know more entries with our + * SubTransactionId. + */ + if (bgres->subid != subid) + break; + + if ((bgres->flags & BGWORKER_PRECIOUS) != 0 && + GetBackgroundWorkerPid(&bgres->handle, &pid) == BGWH_STOPPED) + { + /* XXX. What errcode should we use here? */ + ereport(ERROR, + (errmsg("background worker process exited unexpectedly"), + errhint("More details may be available in the server log."))); + } + } +} + +/* + * Cleanup for top-level transaction commit or abort. + */ +void +AtEOXact_BackgroundWorker(void) +{ + /* Kill any remaining ephemeral workers. */ + while (BackgroundWorkerStackUsed > 0) + { + BackgroundWorkerResource *bgres; + + bgres = &BackgroundWorkerStack[BackgroundWorkerStackUsed - 1]; + + /* + * If the background worker is ephemeral, we must terminate it; + * this will pop the stack as a side effect. Otherwise, we pop the + * stack ourselves. + */ + if ((bgres->flags & BGWORKER_EPHEMERAL) != 0) + TerminateBackgroundWorker(&bgres->handle); + else + RemoveBackgroundWorkerFromStack(&bgres->handle); + } + + /* + * BackgroundWorkerStack is allocated from TopTransactionContext, so that + * will go away automatically. We just need to reset the counters. + */ + BackgroundWorkerStackAllocated = 0; + BackgroundWorkerStackUsed = 0; + NumberOfPreciousBackgroundWorkers = 0; +} + +/* + * Cleanup for subtransaction commit or abort. + */ +void +AtEOSubXact_BackgroundWorker(SubTransactionId mySubid) +{ + while (BackgroundWorkerStackUsed > 0) + { + BackgroundWorkerResource *bgres; + + bgres = &BackgroundWorkerStack[BackgroundWorkerStackUsed - 1]; + + /* + * Once we encounter a SubTransactionId that does not match our own, + * we know that there will be know more entries with our + * SubTransactionId. + */ + if (bgres->subid != mySubid) + break; + + /* + * If the background worker is ephemeral, we must terminate it; + * this will pop the stack as a side effect. Otherwise, we pop the + * stack ourselves. + */ + if ((bgres->flags & BGWORKER_EPHEMERAL) != 0) + TerminateBackgroundWorker(&bgres->handle); + else + RemoveBackgroundWorkerFromStack(&bgres->handle); + } +} diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index c4b5d01..0f171be 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -19,6 +19,7 @@ #include "commands/async.h" #include "miscadmin.h" +#include "postmaster/bgworker.h" #include "storage/latch.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -285,7 +286,22 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); - if (set_latch_on_sigusr1) + if (NumberOfPreciousBackgroundWorkers > 0) + { + /* + * There shouldn't be any precious background workers running in + * contexts where ImmediateInterruptOK is set. + */ + Assert(!ImmediateInterruptOK); + + /* + * Request an interrupt so that we check for worker death at the next + * CHECK_FOR_INTERRUPTS(). + */ + InterruptPending = true; + SetLatch(&MyProc->procLatch); + } + else if (set_latch_on_sigusr1) SetLatch(&MyProc->procLatch); latch_sigusr1_handler(); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 1eaf287..3c97cca 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -56,6 +56,7 @@ #include "parser/analyze.h" #include "parser/parser.h" #include "postmaster/autovacuum.h" +#include "postmaster/bgworker.h" #include "postmaster/postmaster.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -512,6 +513,9 @@ prepare_for_client_read(void) { if (DoingCommandRead) { + /* There shouldn't be any precious bgworkers remaining. */ + Assert(NumberOfPreciousBackgroundWorkers == 0); + /* Enable immediate processing of asynchronous signals */ EnableNotifyInterrupt(); EnableCatchupInterrupt(); @@ -2959,6 +2963,8 @@ ProcessInterrupts(void) errmsg("canceling statement due to user request"))); } } + if (NumberOfPreciousBackgroundWorkers > 0) + CheckForPreciousBackgroundWorkerDeath(); /* If we get here, do nothing (probably, QueryCancelPending was reset) */ } diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index c27b08b..9dd7d26 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -58,6 +58,31 @@ */ #define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002 +/* + * This flag means the bgworker should automatically be terminated at the + * end of the current (sub)transaction. + */ +#define BGWORKER_EPHEMERAL 0x0004 + +/* + * This flag means that the current (sub)transaction should abort with an + * ERROR if the background worker exits unexpectedly. Precious background + * workers should not be left running across command boundaries, and all + * precious background workers should be terminated when they are no longer + * needed. + * + * The ERROR requested by this flag can occur at any CHECK_FOR_INTERRUPTS(). + * However, no error will be thrown if the worker terminates while the parent + * is in a subtransaction. This is because the error would terminate the + * subtransaction, rather than the transaction that should actually be made to + * fail; PostgreSQL currently has no mechanism for an error to terminate a + * subtransaction other than the innermost. If it is necessary to start a + * subtransaction while precious workers exist in a parent (sub)transaction, + * CheckForPreciousBackgroundWorkerDeath() should be called explicitly after + * exiting the subtransaction. + */ +#define BGWORKER_PRECIOUS 0x0008 + typedef void (*bgworker_main_type) (Datum main_arg); @@ -115,6 +140,12 @@ extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle * /* Terminate a bgworker */ extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle); +/* Support for precious and ephemeral background workers. */ +extern unsigned NumberOfPreciousBackgroundWorkers; +extern void CheckForPreciousBackgroundWorkerDeath(void); +extern void AtEOXact_BackgroundWorker(void); +extern void AtEOSubXact_BackgroundWorker(SubTransactionId mySubid); + /* This is valid in a running worker */ extern BackgroundWorker *MyBgworkerEntry;